link-common 0.5.2-rc.2

Shared Rust implementation for KalamDB link crates
Documentation
use crate::consumer::models::ConsumerOffsets;

#[derive(Debug, Clone, Default)]
pub struct OffsetManager {
    position: Option<u64>,
    highest_processed: Option<u64>,
    last_committed: Option<u64>,
}

impl OffsetManager {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn position(&self) -> Option<u64> {
        self.position
    }

    pub fn set_position(&mut self, position: u64) {
        self.position = Some(position);
    }

    pub fn mark_processed(&mut self, offset: u64) {
        self.highest_processed = Some(match self.highest_processed {
            Some(current) => current.max(offset),
            None => offset,
        });
    }

    pub fn highest_processed(&self) -> Option<u64> {
        self.highest_processed
    }

    pub fn set_last_committed(&mut self, offset: u64) {
        self.last_committed = Some(offset);
    }

    pub fn commit_offset(&self) -> Option<u64> {
        match (self.highest_processed, self.last_committed) {
            (Some(processed), Some(committed)) if processed > committed => Some(processed),
            (Some(processed), None) => Some(processed),
            _ => None,
        }
    }

    pub fn snapshot(&self) -> ConsumerOffsets {
        ConsumerOffsets {
            position: self.position.unwrap_or(0),
            last_committed: self.last_committed,
            highest_processed: self.highest_processed,
        }
    }

    pub fn reset_position(&mut self, position: u64) {
        self.position = Some(position);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_offset_manager_has_no_position() {
        let mgr = OffsetManager::new();
        assert!(mgr.position().is_none());
        assert!(mgr.highest_processed().is_none());
        assert!(mgr.commit_offset().is_none());
    }

    #[test]
    fn test_set_position() {
        let mut mgr = OffsetManager::new();
        mgr.set_position(100);
        assert_eq!(mgr.position(), Some(100));
    }

    #[test]
    fn test_mark_processed_tracks_highest() {
        let mut mgr = OffsetManager::new();
        mgr.mark_processed(10);
        assert_eq!(mgr.highest_processed(), Some(10));

        mgr.mark_processed(5);
        assert_eq!(mgr.highest_processed(), Some(10));

        mgr.mark_processed(15);
        assert_eq!(mgr.highest_processed(), Some(15));
    }

    #[test]
    fn test_commit_offset_requires_processed() {
        let mut mgr = OffsetManager::new();
        assert!(mgr.commit_offset().is_none());

        mgr.mark_processed(50);
        assert_eq!(mgr.commit_offset(), Some(50));
    }

    #[test]
    fn test_commit_offset_only_if_ahead_of_last_committed() {
        let mut mgr = OffsetManager::new();
        mgr.mark_processed(50);
        mgr.set_last_committed(50);
        assert!(mgr.commit_offset().is_none());

        mgr.mark_processed(60);
        assert_eq!(mgr.commit_offset(), Some(60));
    }

    #[test]
    fn test_reset_position_changes_next_fetch() {
        let mut mgr = OffsetManager::new();
        mgr.set_position(100);
        mgr.mark_processed(110);

        mgr.reset_position(50);
        assert_eq!(mgr.position(), Some(50));
        assert_eq!(mgr.highest_processed(), Some(110));
    }

    #[test]
    fn test_snapshot() {
        let mut mgr = OffsetManager::new();
        mgr.set_position(100);
        mgr.mark_processed(105);
        mgr.set_last_committed(102);

        let snap = mgr.snapshot();
        assert_eq!(snap.position, 100);
        assert_eq!(snap.highest_processed, Some(105));
        assert_eq!(snap.last_committed, Some(102));
    }
}