crabka-broker 0.3.6

Single-node Apache Kafka-compatible broker (MVP)
Documentation
//! In-memory per-`(group, topicId, partition)` share-delivery state.
//!
//! Reconstructed by folding a `ShareSnapshot` then any subsequent
//! `ShareUpdate` deltas over it (`apply_snapshot` / `apply_update`). The
//! merge logic mirrors KIP-932's `WriteShareGroupState` semantics: advance the
//! share-partition start offset (SPSO), drop in-memory batches fully below
//! it, and upsert written batches keyed by their first offset.

use crate::share_coordinator::persistence::{ShareSnapshotValue, ShareUpdateValue, StateBatch};

#[derive(Debug, Clone, Default)]
pub struct SharePartitionState {
    pub state_epoch: i32,
    pub leader_epoch: i32,
    pub start_offset: i64,
    pub delivery_complete_count: i32,
    pub state_batches: Vec<StateBatch>,
    pub snapshot_epoch: i64,
    pub last_snapshot_offset: i64,
    pub updates_since_snapshot: u32,
}

impl SharePartitionState {
    pub fn apply_snapshot(&mut self, v: &ShareSnapshotValue) {
        self.snapshot_epoch = v.snapshot_epoch;
        self.state_epoch = v.state_epoch;
        self.leader_epoch = v.leader_epoch;
        self.start_offset = v.start_offset;
        self.delivery_complete_count = v.delivery_complete_count;
        self.state_batches.clone_from(&v.state_batches);
        self.updates_since_snapshot = 0;
    }

    pub fn apply_update(&mut self, v: &ShareUpdateValue) {
        self.leader_epoch = v.leader_epoch;
        self.merge_batches(v.start_offset, &v.state_batches);
        self.delivery_complete_count = v.delivery_complete_count;
        self.updates_since_snapshot += 1;
    }

    /// Advance the SPSO, drop batches fully below it, and upsert each
    /// written batch by its `first_offset` (sorted ascending afterwards).
    pub fn merge_batches(&mut self, new_start: i64, written: &[StateBatch]) {
        if new_start > self.start_offset {
            self.start_offset = new_start;
        }
        self.state_batches
            .retain(|b| b.last_offset >= self.start_offset);
        for w in written {
            if w.last_offset < self.start_offset {
                continue;
            }
            self.state_batches
                .retain(|b| b.first_offset != w.first_offset);
            self.state_batches.push(w.clone());
        }
        self.state_batches.sort_by_key(|b| b.first_offset);
    }

    #[must_use]
    pub fn to_snapshot(&self) -> ShareSnapshotValue {
        ShareSnapshotValue {
            snapshot_epoch: self.snapshot_epoch + 1,
            state_epoch: self.state_epoch,
            leader_epoch: self.leader_epoch,
            start_offset: self.start_offset,
            delivery_complete_count: self.delivery_complete_count,
            state_batches: self.state_batches.clone(),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert2::assert;

    fn batch(first: i64, last: i64) -> StateBatch {
        StateBatch {
            first_offset: first,
            last_offset: last,
            delivery_state: 0,
            delivery_count: 1,
        }
    }

    #[test]
    fn apply_snapshot_then_update_advances_spso_and_drops_sub_spso_batches() {
        let mut s = SharePartitionState::default();
        s.apply_snapshot(&ShareSnapshotValue {
            snapshot_epoch: 1,
            state_epoch: 2,
            leader_epoch: 3,
            start_offset: 0,
            delivery_complete_count: 0,
            state_batches: vec![batch(0, 9), batch(10, 19), batch(20, 29)],
        });
        assert!(s.state_epoch == 2);
        assert!(s.start_offset == 0);

        // Advance SPSO past the first two batches and write a new one.
        s.apply_update(&ShareUpdateValue {
            snapshot_epoch: 1,
            leader_epoch: 4,
            start_offset: 20,
            delivery_complete_count: 7,
            state_batches: vec![batch(30, 39)],
        });

        assert!(s.start_offset == 20);
        assert!(s.leader_epoch == 4);
        assert!(s.delivery_complete_count == 7);
        // batch(0,9) and batch(10,19) dropped (last_offset < 20); batch(20,29)
        // retained; batch(30,39) added.
        let firsts: Vec<i64> = s.state_batches.iter().map(|b| b.first_offset).collect();
        assert!(firsts == vec![20, 30]);
        assert!(s.updates_since_snapshot == 1);
    }

    #[test]
    fn merge_upserts_batch_by_first_offset() {
        let mut s = SharePartitionState {
            state_batches: vec![batch(0, 9), batch(10, 19)],
            ..Default::default()
        };
        // Overwrite batch starting at 10 with a longer one and add a new one.
        s.merge_batches(
            0,
            &[
                StateBatch {
                    first_offset: 10,
                    last_offset: 25,
                    delivery_state: 2,
                    delivery_count: 5,
                },
                batch(30, 39),
            ],
        );
        assert!(s.state_batches.len() == 3);
        let updated = s
            .state_batches
            .iter()
            .find(|b| b.first_offset == 10)
            .unwrap();
        assert!(updated.last_offset == 25);
        assert!(updated.delivery_count == 5);
        // Sorted ascending by first_offset.
        let firsts: Vec<i64> = s.state_batches.iter().map(|b| b.first_offset).collect();
        assert!(firsts == vec![0, 10, 30]);
    }

    #[test]
    fn merge_drops_written_batch_below_spso() {
        let mut s = SharePartitionState {
            start_offset: 50,
            ..Default::default()
        };
        // A written batch entirely below the SPSO is ignored.
        s.merge_batches(50, &[batch(0, 9)]);
        assert!(s.state_batches.is_empty());
    }

    #[test]
    fn to_snapshot_bumps_snapshot_epoch() {
        let s = SharePartitionState {
            snapshot_epoch: 4,
            state_epoch: 1,
            start_offset: 10,
            state_batches: vec![batch(10, 19)],
            ..Default::default()
        };
        let snap = s.to_snapshot();
        assert!(snap.snapshot_epoch == 5);
        assert!(snap.start_offset == 10);
        assert!(snap.state_batches == s.state_batches);
    }
}