mod mocks;
use crate::mocks::mock_processor::MockCheckpointingProcessor;
use crate::mocks::mock_reader::MockReader;
use crate::mocks::mock_writer::MockWriter;
use kcl::tick;
fn tick_into_processor(
message: &str,
checkpoint_response: &str,
) -> (MockCheckpointingProcessor, MockWriter) {
let mut processor = MockCheckpointingProcessor::default();
let mut reader = MockReader::with_input(message.to_string());
reader.add_input(checkpoint_response.to_string());
let mut writer = MockWriter::default();
tick(&mut processor, &mut reader, &mut writer).unwrap();
(processor, writer)
}
#[test]
fn test_checkpoint_success() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response = "{\"action\":\"checkpoint\",\"checkpoint\":null,\"error\":null}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
assert_eq!(writer.outputs.len(), 2);
let expected_process_records_status =
"{\"action\":\"status\",\"responseFor\":\"processRecords\"}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_process_records_status.to_string())
);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}
#[test]
#[should_panic(expected = "InvalidStateException, retryable: false")]
fn test_checkpoint_invalid_state_exception() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response =
"{\"action\":\"checkpoint\",\"checkpoint\":null,\"error\":\"InvalidStateException\"}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}
#[test]
#[should_panic(expected = "KinesisClientLibDependencyException, retryable: true")]
fn test_checkpoint_kcl_dep_exception() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response =
"{\"action\":\"checkpoint\",\"checkpoint\":null,\"error\":\"KinesisClientLibDependencyException\"}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}
#[test]
#[should_panic(expected = "ThrottlingException, retryable: true")]
fn test_checkpoint_throttle_exception() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response =
"{\"action\":\"checkpoint\",\"checkpoint\":null,\"error\":\"ThrottlingException\"}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}
#[test]
#[should_panic(expected = "ShutdownException, retryable: false")]
fn test_checkpoint_shutdown_exception() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response =
"{\"action\":\"checkpoint\",\"checkpoint\":null,\"error\":\"ShutdownException\"}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}
#[test]
#[should_panic(expected = "Exception: \"check your point\", retryable: false")]
fn test_checkpoint_unknown_error_exception() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response =
"{\"action\":\"checkpoint\",\"checkpoint\":null,\"error\":\"check your point\"}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}
#[test]
#[should_panic(expected = "UnexpectedResponse, retryable: false")]
fn test_checkpoint_unexpected_response_exception() {
let message = "{\"action\" :\"processRecords\", \"records\": []}";
let checkpoint_response = "{\"action\" :\"processRecords\", \"records\": []}";
let (_processor, mut writer) = tick_into_processor(message, checkpoint_response);
let expected_checkpoint_status =
"{\"action\":\"checkpoint\",\"sequenceNumber\":null,\"subSequenceNumber\":null}\n";
assert_eq!(
writer.outputs.pop(),
Some(expected_checkpoint_status.to_string())
);
}