use crate::writer::OutputWriter;
use crate::messages::CheckpointError::UnexpectedResponse;
use crate::messages::Message::Checkpoint;
use crate::messages::{parse_message, CheckpointError, CheckpointWithErrorPayload};
use crate::reader::InputReader;
use serde::Serialize;
pub struct Checkpointer<'a, W: OutputWriter, R: InputReader> {
writer: &'a mut W,
reader: &'a mut R,
}
impl<'a, W: OutputWriter, R: InputReader> Checkpointer<'a, W, R> {
pub(crate) fn new(writer: &'a mut W, reader: &'a mut R) -> Self {
Self { writer, reader }
}
pub fn checkpoint(
&mut self,
sequence_number: Option<String>,
sub_sequence_number: Option<u64>,
) -> Result<(), CheckpointError> {
let message = CheckpointMessage {
action: "checkpoint".to_string(),
sequence_number,
sub_sequence_number,
};
let mut payload = serde_json::to_vec(&message)?;
payload.push(b'\n');
self.writer.write(payload.as_slice())?;
let next = self.reader.next()?;
let message = parse_message(&next)?;
match message {
Checkpoint(CheckpointWithErrorPayload {
checkpoint: _,
error,
}) => match error {
None => Ok(()),
Some(error) => Err(error),
},
_ => Err(UnexpectedResponse),
}
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CheckpointMessage {
action: String,
sequence_number: Option<String>,
sub_sequence_number: Option<u64>,
}