kcl 0.3.3

a Rust interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon
Documentation
use crate::writer::OutputWriter;

use crate::messages::CheckpointError::UnexpectedResponse;
use crate::messages::Message::Checkpoint;
use crate::messages::{parse_message, CheckpointError, CheckpointWithErrorPayload};
use crate::reader::InputReader;
use serde::Serialize;

pub struct Checkpointer<'a, W: OutputWriter, R: InputReader> {
    writer: &'a mut W,
    reader: &'a mut R,
}

impl<'a, W: OutputWriter, R: InputReader> Checkpointer<'a, W, R> {
    pub(crate) fn new(writer: &'a mut W, reader: &'a mut R) -> Self {
        Self { writer, reader }
    }

    /// Checkpoints at a particular sequence number you provide or if no sequence number is given, the checkpoint will
    /// be at the end of the most recently delivered list of records.
    ///
    /// # Arguments
    ///
    /// * `sequence_number`: the sequence number to checkpoint
    /// * `sub_sequence_number`: the sub sequence number generated by KPL
    ///
    /// returns: Result<(), Report>
    ///
    pub fn checkpoint(
        &mut self,
        sequence_number: Option<String>,
        sub_sequence_number: Option<u64>,
    ) -> Result<(), CheckpointError> {
        let message = CheckpointMessage {
            action: "checkpoint".to_string(),
            sequence_number,
            sub_sequence_number,
        };
        let mut payload = serde_json::to_vec(&message)?;
        payload.push(b'\n');
        self.writer.write(payload.as_slice())?;
        let next = self.reader.next()?;
        let message = parse_message(&next)?;
        match message {
            Checkpoint(CheckpointWithErrorPayload {
                checkpoint: _,
                error,
            }) => match error {
                None => Ok(()),
                Some(error) => Err(error),
            },
            _ => Err(UnexpectedResponse),
        }
    }
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CheckpointMessage {
    action: String,
    sequence_number: Option<String>,
    sub_sequence_number: Option<u64>,
}