tempest_source_mock/
lib.rs1use std::cmp::min;
2use std::collections::VecDeque;
3
4use tempest_source::prelude::*;
5
6static TARGET_SOURCE: &'static str = "source::MockSource";
7
8pub mod prelude {
9 pub use super::{MockSource, MockSourceBuilder, MockSourceOptions};
10}
11
12#[derive(Default)]
15pub struct MockSourceBuilder {
16 options: MockSourceOptions,
17}
18
19impl MockSourceBuilder {
20 pub fn read_msg_count(mut self, count: usize) -> Self {
21 self.options.read_msg_count = Some(count);
22 self
23 }
24
25 pub fn poll_interval(mut self, ms: u64) -> Self {
26 self.options.poll_interval = Some(SourceInterval::Millisecond(ms));
27 self
28 }
29
30 pub fn ack_policy(mut self, policy: SourceAckPolicy) -> Self {
31 self.options.ack_policy = Some(policy);
32 self
33 }
34
35 pub fn max_backoff(mut self, ms: u64) -> Self {
36 self.options.max_backoff = Some(ms);
37 self
38 }
39
40 pub fn prime(mut self, f: fn(mock: &mut MockSource)) -> Self {
43 self.options.prime = Some(f);
44 self
45 }
46}
47
48impl SourceBuilder for MockSourceBuilder {
49 type Source = MockSource;
50
51 fn build(&self) -> Self::Source {
52 let mut source = Self::Source::default();
53 source.options = self.options.clone();
54 source
55 }
56}
57
58#[derive(Clone)]
59pub struct MockSourceOptions {
60 ack_policy: Option<SourceAckPolicy>,
61 ack_interval: Option<SourceInterval>,
62 read_msg_count: Option<usize>,
63 poll_interval: Option<SourceInterval>,
64 max_backoff: Option<u64>,
65 prime: Option<fn(mock: &mut MockSource)>,
66}
67
68impl Default for MockSourceOptions {
69 fn default() -> Self {
70 MockSourceOptions {
71 read_msg_count: Some(10usize),
72 poll_interval: Some(SourceInterval::Millisecond(1u64)),
73 ack_policy: Some(SourceAckPolicy::Batch(10)),
74 ack_interval: Some(SourceInterval::Millisecond(100u64)),
75 max_backoff: Some(1000u64),
76 prime: None,
77 }
78 }
79}
80
81pub struct MockSource {
84 options: MockSourceOptions,
85 pub queue: VecDeque<SourceMsg>,
86 acked: usize,
87}
88
89impl Default for MockSource {
90 fn default() -> Self {
91 MockSource {
92 options: MockSourceOptions::default(),
93 queue: VecDeque::new(),
94 acked: 0,
95 }
96 }
97}
98
99impl MockSource {
100 fn read(&mut self) -> SourcePollResult {
101 let count = self.options.read_msg_count.as_ref().unwrap();
102 let len = self.queue.len();
103 if len > 0 {
104 let msgs = self
105 .queue
106 .drain(..min(len, *count))
107 .collect::<Vec<SourceMsg>>();
108 if msgs.len() > 0 {
109 Ok(Some(msgs))
110 } else {
111 Ok(None)
112 }
113 } else {
114 Ok(None)
115 }
116 }
117}
118
119impl Source for MockSource {
120 fn name(&self) -> &'static str {
121 "MockSource"
122 }
123
124 fn setup(&mut self) -> SourceResult<()> {
125 match self.options.prime {
126 Some(f) => f(self),
127 None => {}
128 }
129 Ok(())
130 }
131
132 fn ack(&mut self, _msg_id: MsgId) -> SourceResult<(i32, i32)> {
133 self.acked += 1;
134 Ok((1, 1))
135 }
136
137 fn batch_ack(&mut self, msgs: Vec<MsgId>) -> SourceResult<(i32, i32)> {
138 self.acked += msgs.len();
139 trace!(target: TARGET_SOURCE, "acked total: {}", &self.acked);
140 Ok((msgs.len() as i32, msgs.len() as i32))
141 }
142
143 fn max_backoff(&self) -> SourceResult<&u64> {
144 match &self.options.max_backoff {
145 Some(v) => Ok(v),
146 None => Source::max_backoff(self),
147 }
148 }
149
150 fn ack_policy(&self) -> SourceResult<&SourceAckPolicy> {
151 match &self.options.ack_policy {
152 Some(v) => Ok(v),
153 None => Source::ack_policy(self),
154 }
155 }
156
157 fn ack_interval(&self) -> SourceResult<&SourceInterval> {
158 match self.options.ack_interval {
159 Some(ref v) => Ok(v),
160 None => Source::ack_interval(self),
161 }
162 }
163
164 fn poll_interval(&self) -> SourceResult<&SourceInterval> {
165 match self.options.poll_interval {
166 Some(ref v) => Ok(v),
167 None => Source::poll_interval(self),
168 }
169 }
170
171 fn poll(&mut self) -> SourcePollResult {
172 self.read()
173 }
174}