kcl 0.3.3

a Rust interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon
Documentation
use crate::mocks::mock_reader::MockReader;
use crate::mocks::mock_writer::MockWriter;
use kcl::checkpointer::Checkpointer;
use kcl::{Processor, Record};

#[derive(Default)]
pub struct MockProcessor {
    pub shard: Option<String>,
    pub records: Vec<Record>,
    pub lease_lost: bool,
    pub shard_ended: bool,
    pub shutdown_requested: bool,
}

impl Processor<MockWriter, MockReader> for MockProcessor {
    fn initialize(&mut self, shard_id: &str) {
        self.shard = Some(shard_id.to_owned())
    }

    fn process_records(
        &mut self,
        data: &[Record],
        _checkpointer: &mut Checkpointer<MockWriter, MockReader>,
    ) {
        for record in data {
            self.records.push((*record).clone())
        }
    }

    fn lease_lost(&mut self) {
        self.lease_lost = true;
    }
    fn shard_ended(&mut self, _checkpointer: &mut Checkpointer<MockWriter, MockReader>) {
        self.shard_ended = true;
    }
    fn shutdown_requested(&mut self, _checkpointer: &mut Checkpointer<MockWriter, MockReader>) {
        self.shutdown_requested = true;
    }
}

#[derive(Default)]
pub struct MockCheckpointingProcessor {
    pub shard: Option<String>,
    pub records: Vec<Record>,
    pub lease_lost: bool,
    pub shard_ended: bool,
    pub shutdown_requested: bool,
}

impl Processor<MockWriter, MockReader> for MockCheckpointingProcessor {
    fn initialize(&mut self, shard_id: &str) {
        self.shard = Some(shard_id.to_owned())
    }

    fn process_records(
        &mut self,
        data: &[Record],
        checkpointer: &mut Checkpointer<MockWriter, MockReader>,
    ) {
        for record in data {
            self.records.push((*record).clone())
        }
        match checkpointer.checkpoint(None, None) {
            Ok(_) => {}
            Err(error) => {
                panic!("{error}, retryable: {}", error.is_retryable())
            }
        };
    }

    fn lease_lost(&mut self) {
        self.lease_lost = true;
    }
    fn shard_ended(&mut self, _checkpointer: &mut Checkpointer<MockWriter, MockReader>) {
        self.shard_ended = true;
    }
    fn shutdown_requested(&mut self, _checkpointer: &mut Checkpointer<MockWriter, MockReader>) {
        self.shutdown_requested = true;
    }
}