liminal/durability/
cursor.rs1use super::{CheckpointPolicy, DurabilityConfig, DurabilityError, DurableStore};
2
3#[derive(Clone, Debug, PartialEq, Eq)]
5pub struct ConsumerCursor {
6 pub consumer_id: String,
8 pub partition_key: String,
10 pub current_offset: u64,
12}
13
14impl ConsumerCursor {
15 #[must_use]
17 pub fn new(consumer_id: impl Into<String>, partition_key: impl Into<String>) -> Self {
18 Self::from_persisted(consumer_id, partition_key, 0)
19 }
20
21 #[must_use]
23 pub fn from_persisted(
24 consumer_id: impl Into<String>,
25 partition_key: impl Into<String>,
26 current_offset: u64,
27 ) -> Self {
28 Self {
29 consumer_id: consumer_id.into(),
30 partition_key: partition_key.into(),
31 current_offset,
32 }
33 }
34
35 pub async fn resume(
41 consumer_id: impl Into<String>,
42 partition_key: impl Into<String>,
43 store: &dyn DurableStore,
44 ) -> Result<Self, DurabilityError> {
45 let consumer_id = consumer_id.into();
46 let partition_key = partition_key.into();
47 let cursor_key = cursor_key_for(&consumer_id, &partition_key);
48 let current_offset = store.read_value(&cursor_key).await?.unwrap_or(0);
49 Ok(Self::from_persisted(
50 consumer_id,
51 partition_key,
52 current_offset,
53 ))
54 }
55
56 #[must_use]
58 pub fn consumer_id(&self) -> &str {
59 &self.consumer_id
60 }
61
62 #[must_use]
64 pub fn partition_key(&self) -> &str {
65 &self.partition_key
66 }
67
68 #[must_use]
70 pub const fn current_offset(&self) -> u64 {
71 self.current_offset
72 }
73
74 #[must_use]
76 pub fn cursor_key(&self) -> String {
77 cursor_key_for(&self.consumer_id, &self.partition_key)
78 }
79
80 pub async fn checkpoint(
87 &mut self,
88 store: &dyn DurableStore,
89 new_offset: u64,
90 ) -> Result<(), DurabilityError> {
91 if new_offset < self.current_offset {
92 return Err(DurabilityError::CursorRegression {
93 stored: self.current_offset,
94 attempted: new_offset,
95 });
96 }
97
98 store
99 .cas(&self.cursor_key(), self.current_offset, new_offset)
100 .await?;
101 self.current_offset = new_offset;
102 Ok(())
103 }
104}
105
106#[derive(Clone, Debug, PartialEq, Eq)]
108pub struct CheckpointDriver {
109 policy: CheckpointPolicy,
110 messages_since_last_checkpoint: usize,
111 pending_offset: Option<u64>,
112}
113
114impl CheckpointDriver {
115 #[must_use]
117 pub fn new(policy: impl Into<CheckpointPolicy>) -> Self {
118 Self::from_policy(policy.into())
119 }
120
121 #[must_use]
123 pub const fn from_policy(policy: CheckpointPolicy) -> Self {
124 Self {
125 policy,
126 messages_since_last_checkpoint: 0,
127 pending_offset: None,
128 }
129 }
130
131 #[must_use]
133 pub const fn from_config(config: DurabilityConfig) -> Self {
134 Self::from_policy(config.checkpoint_policy())
135 }
136
137 #[must_use]
139 pub const fn policy(&self) -> CheckpointPolicy {
140 self.policy
141 }
142
143 #[must_use]
145 pub const fn messages_since_last_checkpoint(&self) -> usize {
146 self.messages_since_last_checkpoint
147 }
148
149 #[must_use]
151 pub const fn pending_offset(&self) -> Option<u64> {
152 self.pending_offset
153 }
154
155 pub async fn record_processed(
165 &mut self,
166 cursor: &mut ConsumerCursor,
167 store: &dyn DurableStore,
168 next_offset: u64,
169 ) -> Result<(), DurabilityError> {
170 self.validate_policy()?;
171 self.messages_since_last_checkpoint = self
172 .messages_since_last_checkpoint
173 .checked_add(1)
174 .ok_or_else(|| {
175 DurabilityError::ConfigError("messages since last checkpoint overflow".to_owned())
176 })?;
177 self.pending_offset = Some(next_offset);
178
179 match self.policy {
180 CheckpointPolicy::PerMessage => self.checkpoint_pending(cursor, store).await,
181 CheckpointPolicy::PerBatch(batch_size)
182 if self.messages_since_last_checkpoint >= batch_size =>
183 {
184 self.checkpoint_pending(cursor, store).await
185 }
186 CheckpointPolicy::PerBatch(_) | CheckpointPolicy::ExplicitFlush => Ok(()),
187 }
188 }
189
190 pub async fn flush(
197 &mut self,
198 cursor: &mut ConsumerCursor,
199 store: &dyn DurableStore,
200 ) -> Result<(), DurabilityError> {
201 self.validate_policy()?;
202 self.checkpoint_pending(cursor, store).await
203 }
204
205 async fn checkpoint_pending(
206 &mut self,
207 cursor: &mut ConsumerCursor,
208 store: &dyn DurableStore,
209 ) -> Result<(), DurabilityError> {
210 let Some(next_offset) = self.pending_offset else {
211 return Ok(());
212 };
213 cursor.checkpoint(store, next_offset).await?;
214 self.messages_since_last_checkpoint = 0;
215 self.pending_offset = None;
216 Ok(())
217 }
218
219 fn validate_policy(&self) -> Result<(), DurabilityError> {
220 if self.policy == CheckpointPolicy::PerBatch(0) {
221 return Err(DurabilityError::ConfigError(
222 "checkpoint batch size must be at least 1".to_owned(),
223 ));
224 }
225 Ok(())
226 }
227}
228
229impl From<DurabilityConfig> for CheckpointPolicy {
230 fn from(config: DurabilityConfig) -> Self {
231 config.checkpoint_policy()
232 }
233}
234
235#[must_use]
237pub fn cursor_key_for(consumer_id: &str, partition_key: &str) -> String {
238 format!("{consumer_id}:{partition_key}")
239}
240
241#[cfg(test)]
242mod tests;