krafka 0.11.0

A pure Rust, async-native Apache Kafka client
Documentation
//! Offset management for consumers.

use ahash::AHashMap as HashMap;

use crate::{Offset, PartitionId};

/// Offset commit metadata.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct OffsetAndMetadata {
    /// The offset to commit.
    pub offset: Offset,
    /// Leader epoch.
    pub leader_epoch: Option<i32>,
    /// Optional metadata.
    pub metadata: Option<String>,
}

impl OffsetAndMetadata {
    /// Create a new offset with no metadata.
    pub fn new(offset: Offset) -> Self {
        Self {
            offset,
            leader_epoch: None,
            metadata: None,
        }
    }

    /// Create with leader epoch.
    pub fn with_epoch(offset: Offset, epoch: i32) -> Self {
        Self {
            offset,
            leader_epoch: Some(epoch),
            metadata: None,
        }
    }

    /// Create with metadata.
    pub fn with_metadata(offset: Offset, metadata: impl Into<String>) -> Self {
        Self {
            offset,
            leader_epoch: None,
            metadata: Some(metadata.into()),
        }
    }
}

/// Tracks committed and fetched offsets.
///
/// Keyed as `topic → partition → value` using two-level `HashMap` nesting.
/// This gives zero-allocation reads (the inner `HashMap::get` takes `&PartitionId`
/// which is `Copy`, and the outer takes `&str` via `String: Borrow<str>`).
///
/// The previous flat `(String, PartitionId)` key required calling `.to_owned()`
/// on every read path because Rust's `Borrow` trait does not extend to tuples.
#[derive(Debug, Default)]
pub struct OffsetStore {
    /// Committed offsets: topic → partition → metadata.
    committed: HashMap<String, HashMap<PartitionId, OffsetAndMetadata>>,
    /// Current fetch position: topic → partition → offset.
    position: HashMap<String, HashMap<PartitionId, Offset>>,
}

impl OffsetStore {
    /// Create a new offset store.
    pub fn new() -> Self {
        Self::default()
    }

    /// Set the committed offset for a topic-partition.
    #[inline]
    pub fn commit(&mut self, topic: &str, partition: PartitionId, offset: OffsetAndMetadata) {
        match self.committed.get_mut(topic) {
            Some(inner) => {
                inner.insert(partition, offset);
            }
            None => {
                self.committed
                    .insert(topic.to_owned(), HashMap::from([(partition, offset)]));
            }
        }
    }

    /// Get the committed offset for a topic-partition.
    #[inline]
    pub fn committed(&self, topic: &str, partition: PartitionId) -> Option<&OffsetAndMetadata> {
        self.committed.get(topic)?.get(&partition)
    }

    /// Set the current position for a topic-partition.
    #[inline]
    pub fn set_position(&mut self, topic: &str, partition: PartitionId, offset: Offset) {
        match self.position.get_mut(topic) {
            Some(inner) => {
                inner.insert(partition, offset);
            }
            None => {
                self.position
                    .insert(topic.to_owned(), HashMap::from([(partition, offset)]));
            }
        }
    }

    /// Get the current position for a topic-partition.
    #[inline]
    pub fn position(&self, topic: &str, partition: PartitionId) -> Option<Offset> {
        self.position.get(topic)?.get(&partition).copied()
    }

    /// Iterate over all committed offsets.
    #[inline]
    pub fn all_committed(&self) -> impl Iterator<Item = ((&str, PartitionId), &OffsetAndMetadata)> {
        self.committed
            .iter()
            .flat_map(|(t, parts)| parts.iter().map(move |(p, v)| ((t.as_str(), *p), v)))
    }

    /// Iterate over all positions.
    #[inline]
    pub fn all_positions(&self) -> impl Iterator<Item = ((&str, PartitionId), Offset)> {
        self.position
            .iter()
            .flat_map(|(t, parts)| parts.iter().map(move |(p, v)| ((t.as_str(), *p), *v)))
    }

    /// Clear all offsets for a topic.
    pub fn clear_topic(&mut self, topic: &str) {
        self.committed.remove(topic);
        self.position.remove(topic);
    }

    /// Clear all offsets.
    pub fn clear(&mut self) {
        self.committed.clear();
        self.position.clear();
    }
}

/// Offset reset strategy result.
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResetOffset {
    /// Use the earliest available offset.
    Earliest,
    /// Use the latest available offset.
    Latest,
    /// Use a specific offset.
    Specific(Offset),
}

impl ResetOffset {
    /// Convert to the protocol offset value.
    pub fn to_offset(&self) -> Offset {
        match self {
            ResetOffset::Earliest => -2,
            ResetOffset::Latest => -1,
            ResetOffset::Specific(o) => *o,
        }
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
    use super::*;

    #[test]
    fn test_offset_and_metadata() {
        let om = OffsetAndMetadata::new(100);
        assert_eq!(om.offset, 100);
        assert!(om.leader_epoch.is_none());
        assert!(om.metadata.is_none());

        let om = OffsetAndMetadata::with_epoch(200, 5);
        assert_eq!(om.offset, 200);
        assert_eq!(om.leader_epoch, Some(5));

        let om = OffsetAndMetadata::with_metadata(300, "test");
        assert_eq!(om.metadata, Some("test".to_string()));
    }

    #[test]
    fn test_offset_store() {
        let mut store = OffsetStore::new();

        store.set_position("topic1", 0, 100);
        store.set_position("topic1", 1, 200);
        store.commit("topic1", 0, OffsetAndMetadata::new(50));

        assert_eq!(store.position("topic1", 0), Some(100));
        assert_eq!(store.position("topic1", 1), Some(200));
        assert_eq!(store.position("topic1", 2), None);

        assert_eq!(store.committed("topic1", 0).unwrap().offset, 50);
        assert!(store.committed("topic1", 1).is_none());
    }

    #[test]
    fn test_offset_store_clear() {
        let mut store = OffsetStore::new();
        store.set_position("topic1", 0, 100);
        store.set_position("topic2", 0, 200);

        store.clear_topic("topic1");
        assert!(store.position("topic1", 0).is_none());
        assert_eq!(store.position("topic2", 0), Some(200));

        store.clear();
        assert!(store.position("topic2", 0).is_none());
    }

    #[test]
    fn test_reset_offset() {
        assert_eq!(ResetOffset::Earliest.to_offset(), -2);
        assert_eq!(ResetOffset::Latest.to_offset(), -1);
        assert_eq!(ResetOffset::Specific(42).to_offset(), 42);
    }
}