kcl 0.3.3

a Rust interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon
Documentation
mod mocks;

use kcl::tick;

use crate::mocks::mock_processor::MockProcessor;
use crate::mocks::mock_reader::MockReader;
use crate::mocks::mock_writer::MockWriter;

fn tick_into_processor(message: &str) -> (MockProcessor, MockWriter) {
    let mut processor = MockProcessor::default();
    let mut reader = MockReader::with_input(message.to_string());
    let mut writer = MockWriter::default();

    tick(&mut processor, &mut reader, &mut writer).unwrap();

    (processor, writer)
}

fn assert_status_response(writer: &MockWriter, status: &str) {
    let expected_out = format!("{{\"action\":\"status\",\"responseFor\":\"{status}\"}}\n");
    assert_eq!(writer.outputs.last(), Some(&expected_out))
}

#[test]
fn test_tick_initialize() {
    let message = "{\"action\" :\"initialize\", \"shardId\": \"shard1\"}";
    let (processor, writer) = tick_into_processor(message);

    assert_eq!(processor.shard, Some("shard1".to_owned()));
    assert_eq!(writer.outputs.len(), 1);
    assert_status_response(&writer, "initialize");
}

#[test]
fn test_tick_new_record() {
    let message = "{\"action\": \"processRecords\", \
        \"records\": [{\
            \"data\": \"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==\",\
            \"partitionKey\": \"1\",\
            \"sequenceNumber\": \"49590338271490256608559692538361571095921575989136588898\",\
            \"approximateArrivalTimestamp\": 1570887011763.01}]}";
    let (processor, writer) = tick_into_processor(message);

    let record = processor.records.last().unwrap();
    assert_eq!(
        std::str::from_utf8(record.raw_data.as_slice()).unwrap(),
        "Hello, this is a test."
    );
    assert_eq!(writer.outputs.len(), 1);
    assert_status_response(&writer, "processRecords");
}

#[test]
fn test_tick_lease_lost() {
    let message = "{\"action\": \"leaseLost\"}";
    let (processor, writer) = tick_into_processor(message);

    assert!(processor.lease_lost);
    assert_eq!(writer.outputs.len(), 1);
    assert_status_response(&writer, "leaseLost");
}

#[test]
fn test_tick_shard_ended() {
    let message = "{\"action\": \"shardEnded\", \"checkpoint\": \"1234\"}";
    let (processor, writer) = tick_into_processor(message);

    assert!(processor.shard_ended);
    assert_eq!(writer.outputs.len(), 1);
    assert_status_response(&writer, "shardEnded");
}

#[test]
fn test_tick_shutdown_requested() {
    let message = "{\"action\": \"shutdownRequested\", \"checkpoint\": \"1234\"}";
    let (processor, writer) = tick_into_processor(message);

    assert!(processor.shutdown_requested);
    assert_eq!(writer.outputs.len(), 1);
    assert_status_response(&writer, "shutdownRequested");
}