tempest-source-mock 0.1.1

Tempest source implementation of a Mock Queue
Documentation
use std::cmp::min;
use std::collections::VecDeque;

use tempest_source::prelude::*;

static TARGET_SOURCE: &'static str = "source::MockSource";

pub mod prelude {
    pub use super::{MockSource, MockSourceBuilder, MockSourceOptions};
}

/// Mock source builder for constructing a simple source
/// for testing topologies
#[derive(Default)]
pub struct MockSourceBuilder {
    options: MockSourceOptions,
}

impl MockSourceBuilder {
    pub fn read_msg_count(mut self, count: usize) -> Self {
        self.options.read_msg_count = Some(count);
        self
    }

    pub fn poll_interval(mut self, ms: u64) -> Self {
        self.options.poll_interval = Some(SourceInterval::Millisecond(ms));
        self
    }

    pub fn ack_policy(mut self, policy: SourceAckPolicy) -> Self {
        self.options.ack_policy = Some(policy);
        self
    }

    pub fn max_backoff(mut self, ms: u64) -> Self {
        self.options.max_backoff = Some(ms);
        self
    }

    /// Prime takes a closure which is called to push messages into
    /// the mock source queue.
    pub fn prime(mut self, f: fn(mock: &mut MockSource)) -> Self {
        self.options.prime = Some(f);
        self
    }
}

impl SourceBuilder for MockSourceBuilder {
    type Source = MockSource;

    fn build(&self) -> Self::Source {
        let mut source = Self::Source::default();
        source.options = self.options.clone();
        source
    }
}

#[derive(Clone)]
pub struct MockSourceOptions {
    ack_policy: Option<SourceAckPolicy>,
    ack_interval: Option<SourceInterval>,
    read_msg_count: Option<usize>,
    poll_interval: Option<SourceInterval>,
    max_backoff: Option<u64>,
    prime: Option<fn(mock: &mut MockSource)>,
}

impl Default for MockSourceOptions {
    fn default() -> Self {
        MockSourceOptions {
            read_msg_count: Some(10usize),
            poll_interval: Some(SourceInterval::Millisecond(1u64)),
            ack_policy: Some(SourceAckPolicy::Batch(10)),
            ack_interval: Some(SourceInterval::Millisecond(100u64)),
            max_backoff: Some(1000u64),
            prime: None,
        }
    }
}

/// A  mock source which should be used for testing purposes.
/// This source mocks pushing & polling messages from a queue.
pub struct MockSource {
    options: MockSourceOptions,
    pub queue: VecDeque<SourceMsg>,
    acked: usize,
}

impl Default for MockSource {
    fn default() -> Self {
        MockSource {
            options: MockSourceOptions::default(),
            queue: VecDeque::new(),
            acked: 0,
        }
    }
}

impl MockSource {
    fn read(&mut self) -> SourcePollResult {
        let count = self.options.read_msg_count.as_ref().unwrap();
        let len = self.queue.len();
        if len > 0 {
            let msgs = self
                .queue
                .drain(..min(len, *count))
                .collect::<Vec<SourceMsg>>();
            if msgs.len() > 0 {
                Ok(Some(msgs))
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    }
}

impl Source for MockSource {
    fn name(&self) -> &'static str {
        "MockSource"
    }

    fn setup(&mut self) -> SourceResult<()> {
        match self.options.prime {
            Some(f) => f(self),
            None => {}
        }
        Ok(())
    }

    fn ack(&mut self, _msg_id: MsgId) -> SourceResult<(i32, i32)> {
        self.acked += 1;
        Ok((1, 1))
    }

    fn batch_ack(&mut self, msgs: Vec<MsgId>) -> SourceResult<(i32, i32)> {
        self.acked += msgs.len();
        trace!(target: TARGET_SOURCE, "acked total: {}", &self.acked);
        Ok((msgs.len() as i32, msgs.len() as i32))
    }

    fn max_backoff(&self) -> SourceResult<&u64> {
        match &self.options.max_backoff {
            Some(v) => Ok(v),
            None => Source::max_backoff(self),
        }
    }

    fn ack_policy(&self) -> SourceResult<&SourceAckPolicy> {
        match &self.options.ack_policy {
            Some(v) => Ok(v),
            None => Source::ack_policy(self),
        }
    }

    fn ack_interval(&self) -> SourceResult<&SourceInterval> {
        match self.options.ack_interval {
            Some(ref v) => Ok(v),
            None => Source::ack_interval(self),
        }
    }

    fn poll_interval(&self) -> SourceResult<&SourceInterval> {
        match self.options.poll_interval {
            Some(ref v) => Ok(v),
            None => Source::poll_interval(self),
        }
    }

    fn poll(&mut self) -> SourcePollResult {
        self.read()
    }
}