1use super::{
2 message::input::Message as MessageIn, message::output::Message as MessageOut,
3 transport::Transport,
4};
5
6#[derive(Debug, thiserror::Error)]
7pub enum CheckpointError<TransportError> {
8 #[error(transparent)]
9 TransportError(TransportError),
10
11 #[error("failed to checkpoint: {reason}")]
12 Failed { reason: String },
13
14 #[error("invalid state: {}", message.id())]
15 InvalidState { message: MessageIn },
16}
17
18pub struct Checkpointer<'a, T>(&'a mut T);
19
20impl<'a, T> Checkpointer<'a, T>
21where
22 T: Transport,
23{
24 pub(crate) fn new(transport: &'a mut T) -> Self {
25 Self(transport)
26 }
27
28 pub async fn checkpoint(
29 &mut self,
30 sequence_number: Option<String>,
31 sub_sequence_number: Option<u64>,
32 ) -> Result<(), CheckpointError<T::Error>> {
33 let request = MessageOut::Checkpoint(super::message::output::CheckpointMessage {
34 sequence_number,
35 sub_sequence_number,
36 });
37
38 self.0
39 .write_message(&request)
40 .await
41 .map_err(CheckpointError::TransportError)?;
42
43 let response = self
44 .0
45 .read_message()
46 .await
47 .map_err(CheckpointError::TransportError)?;
48
49 if let MessageIn::Checkpoint(msg) = response {
50 if let Some(error) = msg.error {
51 Err(CheckpointError::Failed { reason: error })
52 } else {
53 Ok(())
54 }
55 } else {
56 Err(CheckpointError::InvalidState { message: response })
57 }
58 }
59}