use ahash::AHashMap as HashMap;
use crate::{Offset, PartitionId};
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct OffsetAndMetadata {
pub offset: Offset,
pub leader_epoch: Option<i32>,
pub metadata: Option<String>,
}
impl OffsetAndMetadata {
pub fn new(offset: Offset) -> Self {
Self {
offset,
leader_epoch: None,
metadata: None,
}
}
pub fn with_epoch(offset: Offset, epoch: i32) -> Self {
Self {
offset,
leader_epoch: Some(epoch),
metadata: None,
}
}
pub fn with_metadata(offset: Offset, metadata: impl Into<String>) -> Self {
Self {
offset,
leader_epoch: None,
metadata: Some(metadata.into()),
}
}
}
#[derive(Debug, Default)]
pub struct OffsetStore {
committed: HashMap<String, HashMap<PartitionId, OffsetAndMetadata>>,
position: HashMap<String, HashMap<PartitionId, Offset>>,
}
impl OffsetStore {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn commit(&mut self, topic: &str, partition: PartitionId, offset: OffsetAndMetadata) {
match self.committed.get_mut(topic) {
Some(inner) => {
inner.insert(partition, offset);
}
None => {
self.committed
.insert(topic.to_owned(), HashMap::from([(partition, offset)]));
}
}
}
#[inline]
pub fn committed(&self, topic: &str, partition: PartitionId) -> Option<&OffsetAndMetadata> {
self.committed.get(topic)?.get(&partition)
}
#[inline]
pub fn set_position(&mut self, topic: &str, partition: PartitionId, offset: Offset) {
match self.position.get_mut(topic) {
Some(inner) => {
inner.insert(partition, offset);
}
None => {
self.position
.insert(topic.to_owned(), HashMap::from([(partition, offset)]));
}
}
}
#[inline]
pub fn position(&self, topic: &str, partition: PartitionId) -> Option<Offset> {
self.position.get(topic)?.get(&partition).copied()
}
#[inline]
pub fn all_committed(&self) -> impl Iterator<Item = ((&str, PartitionId), &OffsetAndMetadata)> {
self.committed
.iter()
.flat_map(|(t, parts)| parts.iter().map(move |(p, v)| ((t.as_str(), *p), v)))
}
#[inline]
pub fn all_positions(&self) -> impl Iterator<Item = ((&str, PartitionId), Offset)> {
self.position
.iter()
.flat_map(|(t, parts)| parts.iter().map(move |(p, v)| ((t.as_str(), *p), *v)))
}
pub fn clear_topic(&mut self, topic: &str) {
self.committed.remove(topic);
self.position.remove(topic);
}
pub fn clear(&mut self) {
self.committed.clear();
self.position.clear();
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResetOffset {
Earliest,
Latest,
Specific(Offset),
}
impl ResetOffset {
pub fn to_offset(&self) -> Offset {
match self {
ResetOffset::Earliest => -2,
ResetOffset::Latest => -1,
ResetOffset::Specific(o) => *o,
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_offset_and_metadata() {
let om = OffsetAndMetadata::new(100);
assert_eq!(om.offset, 100);
assert!(om.leader_epoch.is_none());
assert!(om.metadata.is_none());
let om = OffsetAndMetadata::with_epoch(200, 5);
assert_eq!(om.offset, 200);
assert_eq!(om.leader_epoch, Some(5));
let om = OffsetAndMetadata::with_metadata(300, "test");
assert_eq!(om.metadata, Some("test".to_string()));
}
#[test]
fn test_offset_store() {
let mut store = OffsetStore::new();
store.set_position("topic1", 0, 100);
store.set_position("topic1", 1, 200);
store.commit("topic1", 0, OffsetAndMetadata::new(50));
assert_eq!(store.position("topic1", 0), Some(100));
assert_eq!(store.position("topic1", 1), Some(200));
assert_eq!(store.position("topic1", 2), None);
assert_eq!(store.committed("topic1", 0).unwrap().offset, 50);
assert!(store.committed("topic1", 1).is_none());
}
#[test]
fn test_offset_store_clear() {
let mut store = OffsetStore::new();
store.set_position("topic1", 0, 100);
store.set_position("topic2", 0, 200);
store.clear_topic("topic1");
assert!(store.position("topic1", 0).is_none());
assert_eq!(store.position("topic2", 0), Some(200));
store.clear();
assert!(store.position("topic2", 0).is_none());
}
#[test]
fn test_reset_offset() {
assert_eq!(ResetOffset::Earliest.to_offset(), -2);
assert_eq!(ResetOffset::Latest.to_offset(), -1);
assert_eq!(ResetOffset::Specific(42).to_offset(), 42);
}
}