Skip to main content

kcl_async/
checkpoint.rs

1use super::{
2    message::input::Message as MessageIn, message::output::Message as MessageOut,
3    transport::Transport,
4};
5
6#[derive(Debug, thiserror::Error)]
7pub enum CheckpointError<TransportError> {
8    #[error(transparent)]
9    TransportError(TransportError),
10
11    #[error("failed to checkpoint: {reason}")]
12    Failed { reason: String },
13
14    #[error("invalid state: {}", message.id())]
15    InvalidState { message: MessageIn },
16}
17
18pub struct Checkpointer<'a, T>(&'a mut T);
19
20impl<'a, T> Checkpointer<'a, T>
21where
22    T: Transport,
23{
24    pub(crate) fn new(transport: &'a mut T) -> Self {
25        Self(transport)
26    }
27
28    pub async fn checkpoint(
29        &mut self,
30        sequence_number: Option<String>,
31        sub_sequence_number: Option<u64>,
32    ) -> Result<(), CheckpointError<T::Error>> {
33        let request = MessageOut::Checkpoint(super::message::output::CheckpointMessage {
34            sequence_number,
35            sub_sequence_number,
36        });
37
38        self.0
39            .write_message(&request)
40            .await
41            .map_err(CheckpointError::TransportError)?;
42
43        let response = self
44            .0
45            .read_message()
46            .await
47            .map_err(CheckpointError::TransportError)?;
48
49        if let MessageIn::Checkpoint(msg) = response {
50            if let Some(error) = msg.error {
51                Err(CheckpointError::Failed { reason: error })
52            } else {
53                Ok(())
54            }
55        } else {
56            Err(CheckpointError::InvalidState { message: response })
57        }
58    }
59}