use super::{CheckpointPolicy, DurabilityConfig, DurabilityError, DurableStore};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ConsumerCursor {
pub consumer_id: String,
pub partition_key: String,
pub current_offset: u64,
}
impl ConsumerCursor {
#[must_use]
pub fn new(consumer_id: impl Into<String>, partition_key: impl Into<String>) -> Self {
Self::from_persisted(consumer_id, partition_key, 0)
}
#[must_use]
pub fn from_persisted(
consumer_id: impl Into<String>,
partition_key: impl Into<String>,
current_offset: u64,
) -> Self {
Self {
consumer_id: consumer_id.into(),
partition_key: partition_key.into(),
current_offset,
}
}
pub async fn resume(
consumer_id: impl Into<String>,
partition_key: impl Into<String>,
store: &dyn DurableStore,
) -> Result<Self, DurabilityError> {
let consumer_id = consumer_id.into();
let partition_key = partition_key.into();
let cursor_key = cursor_key_for(&consumer_id, &partition_key);
let current_offset = store.read_value(&cursor_key).await?.unwrap_or(0);
Ok(Self::from_persisted(
consumer_id,
partition_key,
current_offset,
))
}
#[must_use]
pub fn consumer_id(&self) -> &str {
&self.consumer_id
}
#[must_use]
pub fn partition_key(&self) -> &str {
&self.partition_key
}
#[must_use]
pub const fn current_offset(&self) -> u64 {
self.current_offset
}
#[must_use]
pub fn cursor_key(&self) -> String {
cursor_key_for(&self.consumer_id, &self.partition_key)
}
pub async fn checkpoint(
&mut self,
store: &dyn DurableStore,
new_offset: u64,
) -> Result<(), DurabilityError> {
if new_offset < self.current_offset {
return Err(DurabilityError::CursorRegression {
stored: self.current_offset,
attempted: new_offset,
});
}
store
.cas(&self.cursor_key(), self.current_offset, new_offset)
.await?;
self.current_offset = new_offset;
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CheckpointDriver {
policy: CheckpointPolicy,
messages_since_last_checkpoint: usize,
pending_offset: Option<u64>,
}
impl CheckpointDriver {
#[must_use]
pub fn new(policy: impl Into<CheckpointPolicy>) -> Self {
Self::from_policy(policy.into())
}
#[must_use]
pub const fn from_policy(policy: CheckpointPolicy) -> Self {
Self {
policy,
messages_since_last_checkpoint: 0,
pending_offset: None,
}
}
#[must_use]
pub const fn from_config(config: DurabilityConfig) -> Self {
Self::from_policy(config.checkpoint_policy())
}
#[must_use]
pub const fn policy(&self) -> CheckpointPolicy {
self.policy
}
#[must_use]
pub const fn messages_since_last_checkpoint(&self) -> usize {
self.messages_since_last_checkpoint
}
#[must_use]
pub const fn pending_offset(&self) -> Option<u64> {
self.pending_offset
}
pub async fn record_processed(
&mut self,
cursor: &mut ConsumerCursor,
store: &dyn DurableStore,
next_offset: u64,
) -> Result<(), DurabilityError> {
self.validate_policy()?;
self.messages_since_last_checkpoint = self
.messages_since_last_checkpoint
.checked_add(1)
.ok_or_else(|| {
DurabilityError::ConfigError("messages since last checkpoint overflow".to_owned())
})?;
self.pending_offset = Some(next_offset);
match self.policy {
CheckpointPolicy::PerMessage => self.checkpoint_pending(cursor, store).await,
CheckpointPolicy::PerBatch(batch_size)
if self.messages_since_last_checkpoint >= batch_size =>
{
self.checkpoint_pending(cursor, store).await
}
CheckpointPolicy::PerBatch(_) | CheckpointPolicy::ExplicitFlush => Ok(()),
}
}
pub async fn flush(
&mut self,
cursor: &mut ConsumerCursor,
store: &dyn DurableStore,
) -> Result<(), DurabilityError> {
self.validate_policy()?;
self.checkpoint_pending(cursor, store).await
}
async fn checkpoint_pending(
&mut self,
cursor: &mut ConsumerCursor,
store: &dyn DurableStore,
) -> Result<(), DurabilityError> {
let Some(next_offset) = self.pending_offset else {
return Ok(());
};
cursor.checkpoint(store, next_offset).await?;
self.messages_since_last_checkpoint = 0;
self.pending_offset = None;
Ok(())
}
fn validate_policy(&self) -> Result<(), DurabilityError> {
if self.policy == CheckpointPolicy::PerBatch(0) {
return Err(DurabilityError::ConfigError(
"checkpoint batch size must be at least 1".to_owned(),
));
}
Ok(())
}
}
impl From<DurabilityConfig> for CheckpointPolicy {
fn from(config: DurabilityConfig) -> Self {
config.checkpoint_policy()
}
}
#[must_use]
pub fn cursor_key_for(consumer_id: &str, partition_key: &str) -> String {
format!("{consumer_id}:{partition_key}")
}
#[cfg(test)]
mod tests;