tempest_source_mock/
lib.rs

1use std::cmp::min;
2use std::collections::VecDeque;
3
4use tempest_source::prelude::*;
5
6static TARGET_SOURCE: &'static str = "source::MockSource";
7
8pub mod prelude {
9    pub use super::{MockSource, MockSourceBuilder, MockSourceOptions};
10}
11
12/// Mock source builder for constructing a simple source
13/// for testing topologies
14#[derive(Default)]
15pub struct MockSourceBuilder {
16    options: MockSourceOptions,
17}
18
19impl MockSourceBuilder {
20    pub fn read_msg_count(mut self, count: usize) -> Self {
21        self.options.read_msg_count = Some(count);
22        self
23    }
24
25    pub fn poll_interval(mut self, ms: u64) -> Self {
26        self.options.poll_interval = Some(SourceInterval::Millisecond(ms));
27        self
28    }
29
30    pub fn ack_policy(mut self, policy: SourceAckPolicy) -> Self {
31        self.options.ack_policy = Some(policy);
32        self
33    }
34
35    pub fn max_backoff(mut self, ms: u64) -> Self {
36        self.options.max_backoff = Some(ms);
37        self
38    }
39
40    /// Prime takes a closure which is called to push messages into
41    /// the mock source queue.
42    pub fn prime(mut self, f: fn(mock: &mut MockSource)) -> Self {
43        self.options.prime = Some(f);
44        self
45    }
46}
47
48impl SourceBuilder for MockSourceBuilder {
49    type Source = MockSource;
50
51    fn build(&self) -> Self::Source {
52        let mut source = Self::Source::default();
53        source.options = self.options.clone();
54        source
55    }
56}
57
58#[derive(Clone)]
59pub struct MockSourceOptions {
60    ack_policy: Option<SourceAckPolicy>,
61    ack_interval: Option<SourceInterval>,
62    read_msg_count: Option<usize>,
63    poll_interval: Option<SourceInterval>,
64    max_backoff: Option<u64>,
65    prime: Option<fn(mock: &mut MockSource)>,
66}
67
68impl Default for MockSourceOptions {
69    fn default() -> Self {
70        MockSourceOptions {
71            read_msg_count: Some(10usize),
72            poll_interval: Some(SourceInterval::Millisecond(1u64)),
73            ack_policy: Some(SourceAckPolicy::Batch(10)),
74            ack_interval: Some(SourceInterval::Millisecond(100u64)),
75            max_backoff: Some(1000u64),
76            prime: None,
77        }
78    }
79}
80
81/// A  mock source which should be used for testing purposes.
82/// This source mocks pushing & polling messages from a queue.
83pub struct MockSource {
84    options: MockSourceOptions,
85    pub queue: VecDeque<SourceMsg>,
86    acked: usize,
87}
88
89impl Default for MockSource {
90    fn default() -> Self {
91        MockSource {
92            options: MockSourceOptions::default(),
93            queue: VecDeque::new(),
94            acked: 0,
95        }
96    }
97}
98
99impl MockSource {
100    fn read(&mut self) -> SourcePollResult {
101        let count = self.options.read_msg_count.as_ref().unwrap();
102        let len = self.queue.len();
103        if len > 0 {
104            let msgs = self
105                .queue
106                .drain(..min(len, *count))
107                .collect::<Vec<SourceMsg>>();
108            if msgs.len() > 0 {
109                Ok(Some(msgs))
110            } else {
111                Ok(None)
112            }
113        } else {
114            Ok(None)
115        }
116    }
117}
118
119impl Source for MockSource {
120    fn name(&self) -> &'static str {
121        "MockSource"
122    }
123
124    fn setup(&mut self) -> SourceResult<()> {
125        match self.options.prime {
126            Some(f) => f(self),
127            None => {}
128        }
129        Ok(())
130    }
131
132    fn ack(&mut self, _msg_id: MsgId) -> SourceResult<(i32, i32)> {
133        self.acked += 1;
134        Ok((1, 1))
135    }
136
137    fn batch_ack(&mut self, msgs: Vec<MsgId>) -> SourceResult<(i32, i32)> {
138        self.acked += msgs.len();
139        trace!(target: TARGET_SOURCE, "acked total: {}", &self.acked);
140        Ok((msgs.len() as i32, msgs.len() as i32))
141    }
142
143    fn max_backoff(&self) -> SourceResult<&u64> {
144        match &self.options.max_backoff {
145            Some(v) => Ok(v),
146            None => Source::max_backoff(self),
147        }
148    }
149
150    fn ack_policy(&self) -> SourceResult<&SourceAckPolicy> {
151        match &self.options.ack_policy {
152            Some(v) => Ok(v),
153            None => Source::ack_policy(self),
154        }
155    }
156
157    fn ack_interval(&self) -> SourceResult<&SourceInterval> {
158        match self.options.ack_interval {
159            Some(ref v) => Ok(v),
160            None => Source::ack_interval(self),
161        }
162    }
163
164    fn poll_interval(&self) -> SourceResult<&SourceInterval> {
165        match self.options.poll_interval {
166            Some(ref v) => Ok(v),
167            None => Source::poll_interval(self),
168        }
169    }
170
171    fn poll(&mut self) -> SourcePollResult {
172        self.read()
173    }
174}