Skip to main content

liminal/durability/
cursor.rs

1use super::{CheckpointPolicy, DurabilityConfig, DurabilityError, DurableStore};
2
3/// Partition-specific durable read position for one consumer.
4#[derive(Clone, Debug, PartialEq, Eq)]
5pub struct ConsumerCursor {
6    /// Stable consumer identifier.
7    pub consumer_id: String,
8    /// Durable channel partition key, formatted as `channel_id:partition_index`.
9    pub partition_key: String,
10    /// Current persisted read offset for this consumer and partition.
11    pub current_offset: u64,
12}
13
14impl ConsumerCursor {
15    /// Creates a fresh cursor at offset zero.
16    #[must_use]
17    pub fn new(consumer_id: impl Into<String>, partition_key: impl Into<String>) -> Self {
18        Self::from_persisted(consumer_id, partition_key, 0)
19    }
20
21    /// Creates a cursor from an offset read from durable storage.
22    #[must_use]
23    pub fn from_persisted(
24        consumer_id: impl Into<String>,
25        partition_key: impl Into<String>,
26        current_offset: u64,
27    ) -> Self {
28        Self {
29            consumer_id: consumer_id.into(),
30            partition_key: partition_key.into(),
31            current_offset,
32        }
33    }
34
35    /// Resumes a cursor by reading the CAS-backed offset value.
36    ///
37    /// # Errors
38    ///
39    /// Propagates store read errors from [`DurableStore::read_value`].
40    pub async fn resume(
41        consumer_id: impl Into<String>,
42        partition_key: impl Into<String>,
43        store: &dyn DurableStore,
44    ) -> Result<Self, DurabilityError> {
45        let consumer_id = consumer_id.into();
46        let partition_key = partition_key.into();
47        let cursor_key = cursor_key_for(&consumer_id, &partition_key);
48        let current_offset = store.read_value(&cursor_key).await?.unwrap_or(0);
49        Ok(Self::from_persisted(
50            consumer_id,
51            partition_key,
52            current_offset,
53        ))
54    }
55
56    /// Returns the stable consumer identifier.
57    #[must_use]
58    pub fn consumer_id(&self) -> &str {
59        &self.consumer_id
60    }
61
62    /// Returns the durable channel partition key, formatted as `channel_id:partition_index`.
63    #[must_use]
64    pub fn partition_key(&self) -> &str {
65        &self.partition_key
66    }
67
68    /// Returns the current persisted read offset.
69    #[must_use]
70    pub const fn current_offset(&self) -> u64 {
71        self.current_offset
72    }
73
74    /// Returns the haematite CAS key used for this cursor.
75    #[must_use]
76    pub fn cursor_key(&self) -> String {
77        cursor_key_for(&self.consumer_id, &self.partition_key)
78    }
79
80    /// Persists a new cursor position with compare-and-swap.
81    ///
82    /// # Errors
83    ///
84    /// Returns [`DurabilityError::CursorRegression`] when `new_offset` is lower than this
85    /// cursor's current offset, and propagates store CAS errors including stale checkpoints.
86    pub async fn checkpoint(
87        &mut self,
88        store: &dyn DurableStore,
89        new_offset: u64,
90    ) -> Result<(), DurabilityError> {
91        if new_offset < self.current_offset {
92            return Err(DurabilityError::CursorRegression {
93                stored: self.current_offset,
94                attempted: new_offset,
95            });
96        }
97
98        store
99            .cas(&self.cursor_key(), self.current_offset, new_offset)
100            .await?;
101        self.current_offset = new_offset;
102        Ok(())
103    }
104}
105
106/// Drives cursor checkpoints according to the channel's configured policy.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub struct CheckpointDriver {
109    policy: CheckpointPolicy,
110    messages_since_last_checkpoint: usize,
111    pending_offset: Option<u64>,
112}
113
114impl CheckpointDriver {
115    /// Creates a checkpoint driver from a checkpoint policy or full durability config.
116    #[must_use]
117    pub fn new(policy: impl Into<CheckpointPolicy>) -> Self {
118        Self::from_policy(policy.into())
119    }
120
121    /// Creates a checkpoint driver from an explicit checkpoint policy.
122    #[must_use]
123    pub const fn from_policy(policy: CheckpointPolicy) -> Self {
124        Self {
125            policy,
126            messages_since_last_checkpoint: 0,
127            pending_offset: None,
128        }
129    }
130
131    /// Creates a checkpoint driver from a durability configuration.
132    #[must_use]
133    pub const fn from_config(config: DurabilityConfig) -> Self {
134        Self::from_policy(config.checkpoint_policy())
135    }
136
137    /// Returns the active checkpoint policy.
138    #[must_use]
139    pub const fn policy(&self) -> CheckpointPolicy {
140        self.policy
141    }
142
143    /// Returns the number of processed messages since the last successful checkpoint.
144    #[must_use]
145    pub const fn messages_since_last_checkpoint(&self) -> usize {
146        self.messages_since_last_checkpoint
147    }
148
149    /// Returns the latest processed offset waiting to be checkpointed.
150    #[must_use]
151    pub const fn pending_offset(&self) -> Option<u64> {
152        self.pending_offset
153    }
154
155    /// Records one processed message and checkpoints if the configured policy requires it.
156    ///
157    /// `next_offset` is the partition offset from which the consumer should resume after the
158    /// processed message. The driver delegates all persistence to [`ConsumerCursor::checkpoint`].
159    ///
160    /// # Errors
161    ///
162    /// Returns cursor checkpoint errors from the store, or a configuration error for an invalid
163    /// raw batch policy.
164    pub async fn record_processed(
165        &mut self,
166        cursor: &mut ConsumerCursor,
167        store: &dyn DurableStore,
168        next_offset: u64,
169    ) -> Result<(), DurabilityError> {
170        self.validate_policy()?;
171        self.messages_since_last_checkpoint = self
172            .messages_since_last_checkpoint
173            .checked_add(1)
174            .ok_or_else(|| {
175                DurabilityError::ConfigError("messages since last checkpoint overflow".to_owned())
176            })?;
177        self.pending_offset = Some(next_offset);
178
179        match self.policy {
180            CheckpointPolicy::PerMessage => self.checkpoint_pending(cursor, store).await,
181            CheckpointPolicy::PerBatch(batch_size)
182                if self.messages_since_last_checkpoint >= batch_size =>
183            {
184                self.checkpoint_pending(cursor, store).await
185            }
186            CheckpointPolicy::PerBatch(_) | CheckpointPolicy::ExplicitFlush => Ok(()),
187        }
188    }
189
190    /// Flushes any processed offset that is waiting to be checkpointed.
191    ///
192    /// # Errors
193    ///
194    /// Returns cursor checkpoint errors from the store, or a configuration error for an invalid
195    /// raw batch policy.
196    pub async fn flush(
197        &mut self,
198        cursor: &mut ConsumerCursor,
199        store: &dyn DurableStore,
200    ) -> Result<(), DurabilityError> {
201        self.validate_policy()?;
202        self.checkpoint_pending(cursor, store).await
203    }
204
205    async fn checkpoint_pending(
206        &mut self,
207        cursor: &mut ConsumerCursor,
208        store: &dyn DurableStore,
209    ) -> Result<(), DurabilityError> {
210        let Some(next_offset) = self.pending_offset else {
211            return Ok(());
212        };
213        cursor.checkpoint(store, next_offset).await?;
214        self.messages_since_last_checkpoint = 0;
215        self.pending_offset = None;
216        Ok(())
217    }
218
219    fn validate_policy(&self) -> Result<(), DurabilityError> {
220        if self.policy == CheckpointPolicy::PerBatch(0) {
221            return Err(DurabilityError::ConfigError(
222                "checkpoint batch size must be at least 1".to_owned(),
223            ));
224        }
225        Ok(())
226    }
227}
228
229impl From<DurabilityConfig> for CheckpointPolicy {
230    fn from(config: DurabilityConfig) -> Self {
231        config.checkpoint_policy()
232    }
233}
234
235/// Formats the haematite key used for cursor compare-and-swap state.
236#[must_use]
237pub fn cursor_key_for(consumer_id: &str, partition_key: &str) -> String {
238    format!("{consumer_id}:{partition_key}")
239}
240
241#[cfg(test)]
242mod tests;