use std::collections::HashMap;
use std::fmt;
use std::time::Duration;
use bytes::Bytes;
use tracing::{debug, info};
use super::record::ConsumerRecord;
use super::{AutoOffsetReset, Consumer};
use crate::auth::AuthConfig;
use crate::error::{KrafkaError, Result};
use crate::{Offset, PartitionId, Timestamp};
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableChange {
pub key: Bytes,
pub old_value: Option<Bytes>,
pub new_value: Option<Bytes>,
pub partition: PartitionId,
pub offset: Offset,
pub timestamp: Timestamp,
}
impl TableChange {
#[inline]
pub fn is_delete(&self) -> bool {
self.new_value.is_none()
}
#[inline]
pub fn is_insert(&self) -> bool {
self.old_value.is_none() && self.new_value.is_some()
}
#[inline]
pub fn is_update(&self) -> bool {
self.old_value.is_some() && self.new_value.is_some()
}
}
#[derive(Default, Clone)]
pub struct CompactedTable {
entries: HashMap<Bytes, Bytes>,
records_processed: u64,
tombstones_processed: u64,
}
impl CompactedTable {
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
entries: HashMap::with_capacity(capacity),
records_processed: 0,
tombstones_processed: 0,
}
}
#[must_use = "use ingest() if changes are not needed"]
pub fn apply(&mut self, records: &[ConsumerRecord]) -> Vec<TableChange> {
let mut changes = Vec::with_capacity(records.len());
for record in records {
self.records_processed += 1;
let Some(ref key) = record.key else {
continue;
};
let change = self.apply_keyed_record(key, record);
changes.push(change);
}
changes
}
pub fn get(&self, key: &[u8]) -> Option<&Bytes> {
self.entries.get(key)
}
pub fn contains_key(&self, key: &[u8]) -> bool {
self.entries.contains_key(key)
}
#[inline]
pub fn len(&self) -> usize {
self.entries.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (&Bytes, &Bytes)> {
self.entries.iter()
}
pub fn keys(&self) -> impl Iterator<Item = &Bytes> {
self.entries.keys()
}
pub fn values(&self) -> impl Iterator<Item = &Bytes> {
self.entries.values()
}
#[must_use]
pub fn snapshot(&self) -> HashMap<Bytes, Bytes> {
self.entries.clone()
}
pub fn records_processed(&self) -> u64 {
self.records_processed
}
pub fn tombstones_processed(&self) -> u64 {
self.tombstones_processed
}
pub fn ingest(&mut self, records: &[ConsumerRecord]) {
for record in records {
self.records_processed += 1;
let Some(ref key) = record.key else {
continue;
};
self.ingest_keyed_record(key, record);
}
}
fn apply_keyed_record(&mut self, key: &Bytes, record: &ConsumerRecord) -> TableChange {
if record.is_tombstone() {
self.tombstones_processed += 1;
let old_value = self.entries.remove(key.as_ref());
TableChange {
key: key.clone(),
old_value,
new_value: None,
partition: record.partition,
offset: record.offset,
timestamp: record.timestamp,
}
} else {
let Some(value) = record.value.clone() else {
unreachable!("non-tombstone compacted record must have a value");
};
let key_owned = key.clone();
let old_value = self.entries.insert(key_owned.clone(), value.clone());
TableChange {
key: key_owned,
old_value,
new_value: Some(value),
partition: record.partition,
offset: record.offset,
timestamp: record.timestamp,
}
}
}
fn ingest_keyed_record(&mut self, key: &Bytes, record: &ConsumerRecord) {
if record.is_tombstone() {
self.tombstones_processed += 1;
self.entries.remove(key.as_ref());
} else {
let Some(value) = record.value.clone() else {
unreachable!("non-tombstone compacted record must have a value");
};
self.entries.insert(key.clone(), value);
}
}
pub fn clear(&mut self) {
self.entries.clear();
self.records_processed = 0;
self.tombstones_processed = 0;
}
}
impl<'a> IntoIterator for &'a CompactedTable {
type Item = (&'a Bytes, &'a Bytes);
type IntoIter = std::collections::hash_map::Iter<'a, Bytes, Bytes>;
fn into_iter(self) -> Self::IntoIter {
self.entries.iter()
}
}
impl IntoIterator for CompactedTable {
type Item = (Bytes, Bytes);
type IntoIter = std::collections::hash_map::IntoIter<Bytes, Bytes>;
fn into_iter(self) -> Self::IntoIter {
self.entries.into_iter()
}
}
impl fmt::Debug for CompactedTable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactedTable")
.field("len", &self.entries.len())
.field("records_processed", &self.records_processed)
.field("tombstones_processed", &self.tombstones_processed)
.finish()
}
}
impl PartialEq for CompactedTable {
fn eq(&self, other: &Self) -> bool {
self.entries == other.entries
}
}
impl Eq for CompactedTable {}
pub struct CompactedTopicConsumer {
consumer: Consumer,
topic: String,
table: CompactedTable,
caught_up: bool,
}
impl fmt::Debug for CompactedTopicConsumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactedTopicConsumer")
.field("topic", &self.topic)
.field("caught_up", &self.caught_up)
.field("table", &self.table)
.finish()
}
}
impl CompactedTopicConsumer {
pub fn builder() -> CompactedTopicConsumerBuilder {
CompactedTopicConsumerBuilder::default()
}
pub fn from_consumer(consumer: Consumer, topic: impl Into<String>) -> Self {
Self {
consumer,
topic: topic.into(),
table: CompactedTable::new(),
caught_up: false,
}
}
pub async fn scan(&mut self, poll_timeout: Duration) -> Result<()> {
let assignments = self.consumer.assignment().await;
if assignments.get(&self.topic).is_none_or(|p| p.is_empty()) {
return Err(KrafkaError::invalid_state(format!(
"no partitions assigned for topic '{}'; \
assign partitions before calling scan()",
self.topic
)));
}
info!("Starting compacted topic scan for '{}'", self.topic);
loop {
let mut records = self.consumer.poll(poll_timeout).await?;
let before_len = records.len();
records.retain(|r| r.topic == self.topic);
let filtered = before_len - records.len();
if filtered > 0 {
debug!(
"Filtered out {} record(s) from other topics during scan for '{}'",
filtered, self.topic
);
}
self.table.ingest(&records);
if self.check_caught_up().await {
self.caught_up = true;
info!(
"Compacted topic scan complete for '{}': {} keys, {} records processed, \
{} tombstones",
self.topic,
self.table.len(),
self.table.records_processed(),
self.table.tombstones_processed(),
);
return Ok(());
}
}
}
pub async fn poll(&mut self, timeout: Duration) -> Result<Vec<TableChange>> {
let mut records = self.consumer.poll(timeout).await?;
let before_len = records.len();
records.retain(|r| r.topic == self.topic);
let filtered = before_len - records.len();
if filtered > 0 {
debug!(
"Filtered out {} record(s) from other topics during poll for '{}'",
filtered, self.topic
);
}
let changes = self.table.apply(&records);
if !self.caught_up && self.check_caught_up().await {
self.caught_up = true;
debug!(
"CompactedTopicConsumer for '{}' caught up via poll()",
self.topic
);
}
Ok(changes)
}
pub fn table(&self) -> &CompactedTable {
&self.table
}
pub fn table_mut(&mut self) -> &mut CompactedTable {
&mut self.table
}
pub fn is_caught_up(&self) -> bool {
self.caught_up
}
pub fn topic(&self) -> &str {
&self.topic
}
pub fn consumer(&self) -> &Consumer {
&self.consumer
}
pub fn consumer_mut(&mut self) -> &mut Consumer {
&mut self.consumer
}
pub fn into_parts(self) -> (Consumer, CompactedTable) {
(self.consumer, self.table)
}
pub async fn close(&self) {
self.consumer.close().await;
}
async fn check_caught_up(&self) -> bool {
let assignments = self.consumer.assignment().await;
let Some(partitions) = assignments.get(&self.topic) else {
return false;
};
for &partition in partitions {
let position = self.consumer.position(&self.topic, partition).await;
let high_watermark = self
.consumer
.cached_end_offset(&self.topic, partition)
.await;
match (position, high_watermark) {
(Some(pos), Some(hw)) if pos >= hw => continue,
(_, Some(0)) => continue,
_ => return false,
}
}
true
}
}
#[derive(Default)]
pub struct CompactedTopicConsumerBuilder {
bootstrap_servers: Option<String>,
topic: Option<String>,
client_id: Option<String>,
request_timeout: Option<Duration>,
fetch_max_bytes: Option<i32>,
max_partition_fetch_bytes: Option<i32>,
max_poll_records: Option<i32>,
auth: Option<AuthConfig>,
#[cfg(feature = "socks5")]
proxy: Option<crate::network::ProxyConfig>,
}
impl CompactedTopicConsumerBuilder {
pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
self.bootstrap_servers = Some(servers.into());
self
}
pub fn topic(mut self, topic: impl Into<String>) -> Self {
self.topic = Some(topic.into());
self
}
pub fn client_id(mut self, client_id: impl Into<String>) -> Self {
self.client_id = Some(client_id.into());
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = Some(timeout);
self
}
pub fn fetch_max_bytes(mut self, bytes: i32) -> Self {
self.fetch_max_bytes = Some(bytes);
self
}
pub fn max_partition_fetch_bytes(mut self, bytes: i32) -> Self {
self.max_partition_fetch_bytes = Some(bytes);
self
}
pub fn max_poll_records(mut self, max: i32) -> Self {
self.max_poll_records = Some(max);
self
}
pub fn auth(mut self, auth: AuthConfig) -> Self {
self.auth = Some(auth);
self
}
#[cfg(feature = "socks5")]
pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
self.proxy = Some(proxy);
self
}
pub async fn build(self) -> Result<CompactedTopicConsumer> {
let bootstrap_servers = self
.bootstrap_servers
.ok_or_else(|| KrafkaError::config("bootstrap_servers is required"))?;
let topic = self
.topic
.ok_or_else(|| KrafkaError::config("topic is required for CompactedTopicConsumer"))?;
let mut consumer_builder = Consumer::builder()
.bootstrap_servers(&bootstrap_servers)
.auto_offset_reset(AutoOffsetReset::Earliest)
.enable_auto_commit(false);
if let Some(client_id) = self.client_id {
consumer_builder = consumer_builder.client_id(client_id);
}
if let Some(timeout) = self.request_timeout {
consumer_builder = consumer_builder.request_timeout(timeout);
}
if let Some(bytes) = self.fetch_max_bytes {
consumer_builder = consumer_builder.fetch_max_bytes(bytes);
}
if let Some(bytes) = self.max_partition_fetch_bytes {
consumer_builder = consumer_builder.max_partition_fetch_bytes(bytes);
}
if let Some(max) = self.max_poll_records {
consumer_builder = consumer_builder.max_poll_records(max);
}
if let Some(auth) = self.auth {
consumer_builder = consumer_builder.auth(auth);
}
#[cfg(feature = "socks5")]
if let Some(proxy) = self.proxy {
consumer_builder = consumer_builder.proxy(proxy);
}
let consumer = consumer_builder.build().await?;
consumer
.metadata
.refresh_for_topics(Some(&[&topic]))
.await?;
let partition_count = consumer.metadata.partition_count(&topic).ok_or_else(|| {
KrafkaError::config(format!("topic '{topic}' not found in cluster metadata"))
})?;
let partition_count = PartitionId::try_from(partition_count).map_err(|_| {
KrafkaError::config(format!(
"topic '{topic}' has too many partitions to fit in PartitionId"
))
})?;
let partitions: Vec<PartitionId> = (0..partition_count).collect();
consumer.assign(&topic, partitions).await?;
info!(
"CompactedTopicConsumer initialized for '{}' with {} partitions",
topic, partition_count
);
Ok(CompactedTopicConsumer::from_consumer(consumer, topic))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
fn make_record(
key: Option<&str>,
value: Option<&str>,
partition: PartitionId,
offset: Offset,
) -> ConsumerRecord {
ConsumerRecord {
topic: "test-topic".to_string(),
partition,
offset,
timestamp: offset * 1000,
timestamp_type: 0,
key: key.map(|k| Bytes::from(k.to_string())),
value: value.map(|v| Bytes::from(v.to_string())),
headers: Vec::new(),
leader_epoch: None,
delivery_count: None,
}
}
#[test]
fn test_table_insert() {
let mut table = CompactedTable::new();
let records = vec![
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), Some("v2"), 0, 1),
];
let changes = table.apply(&records);
assert_eq!(table.len(), 2);
assert_eq!(table.get(b"k1"), Some(&Bytes::from("v1")));
assert_eq!(table.get(b"k2"), Some(&Bytes::from("v2")));
assert_eq!(changes.len(), 2);
assert!(changes[0].is_insert());
assert!(changes[1].is_insert());
assert_eq!(table.records_processed(), 2);
assert_eq!(table.tombstones_processed(), 0);
}
#[test]
fn test_table_update() {
let mut table = CompactedTable::new();
table.ingest(&[make_record(Some("k1"), Some("old"), 0, 0)]);
let changes = table.apply(&[make_record(Some("k1"), Some("new"), 0, 5)]);
assert_eq!(table.len(), 1);
assert_eq!(table.get(b"k1"), Some(&Bytes::from("new")));
assert_eq!(changes.len(), 1);
assert!(changes[0].is_update());
assert_eq!(changes[0].old_value, Some(Bytes::from("old")));
assert_eq!(changes[0].new_value, Some(Bytes::from("new")));
}
#[test]
fn test_table_tombstone() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), Some("v2"), 0, 1),
]);
let changes = table.apply(&[make_record(Some("k1"), None, 0, 10)]);
assert_eq!(table.len(), 1);
assert!(!table.contains_key(b"k1"));
assert_eq!(table.get(b"k2"), Some(&Bytes::from("v2")));
assert_eq!(changes.len(), 1);
assert!(changes[0].is_delete());
assert_eq!(changes[0].old_value, Some(Bytes::from("v1")));
assert_eq!(changes[0].new_value, None);
assert_eq!(table.tombstones_processed(), 1);
}
#[test]
fn test_table_tombstone_for_missing_key() {
let mut table = CompactedTable::new();
let changes = table.apply(&[make_record(Some("missing"), None, 0, 0)]);
assert!(table.is_empty());
assert_eq!(changes.len(), 1);
assert!(changes[0].is_delete());
assert_eq!(changes[0].old_value, None);
assert_eq!(table.tombstones_processed(), 1);
}
#[test]
fn test_table_skips_keyless() {
let mut table = CompactedTable::new();
let records = vec![
make_record(None, Some("value-without-key"), 0, 0),
make_record(Some("k1"), Some("v1"), 0, 1),
];
let changes = table.apply(&records);
assert_eq!(table.len(), 1);
assert_eq!(changes.len(), 1);
assert_eq!(table.records_processed(), 2);
}
#[test]
fn test_table_full_lifecycle() {
let mut table = CompactedTable::new();
let changes = table.apply(&[
make_record(Some("user-1"), Some("Alice"), 0, 0),
make_record(Some("user-2"), Some("Bob"), 0, 1),
]);
assert_eq!(table.len(), 2);
assert!(changes.iter().all(|c| c.is_insert()));
let changes = table.apply(&[make_record(Some("user-1"), Some("Alice V2"), 0, 2)]);
assert_eq!(table.get(b"user-1"), Some(&Bytes::from("Alice V2")));
assert!(changes[0].is_update());
let changes = table.apply(&[make_record(Some("user-2"), None, 0, 3)]);
assert_eq!(table.len(), 1);
assert!(changes[0].is_delete());
let changes = table.apply(&[make_record(Some("user-2"), Some("Bob V2"), 0, 4)]);
assert_eq!(table.len(), 2);
assert!(changes[0].is_insert());
}
#[test]
fn test_table_empty_input() {
let mut table = CompactedTable::new();
table.ingest(&[make_record(Some("k1"), Some("v1"), 0, 0)]);
let changes = table.apply(&[]);
assert_eq!(table.len(), 1);
assert!(changes.is_empty());
}
#[test]
fn test_table_multiple_partitions() {
let mut table = CompactedTable::new();
let records = vec![
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), Some("v2"), 1, 0),
make_record(Some("k1"), Some("v1-updated"), 0, 1),
];
let changes = table.apply(&records);
assert_eq!(table.len(), 2);
assert_eq!(table.get(b"k1"), Some(&Bytes::from("v1-updated")));
assert_eq!(changes.len(), 3);
assert!(changes[0].is_insert());
assert!(changes[1].is_insert());
assert!(changes[2].is_update());
assert_eq!(changes[0].partition, 0);
assert_eq!(changes[1].partition, 1);
}
#[test]
fn test_table_with_capacity() {
let table = CompactedTable::with_capacity(100);
assert!(table.is_empty());
assert_eq!(table.records_processed(), 0);
}
#[test]
fn test_table_iter() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("a"), Some("1"), 0, 0),
make_record(Some("b"), Some("2"), 0, 1),
]);
let items: HashMap<&Bytes, &Bytes> = table.iter().collect();
assert_eq!(items.len(), 2);
assert_eq!(items[&Bytes::from("a")], &Bytes::from("1"));
assert_eq!(items[&Bytes::from("b")], &Bytes::from("2"));
}
#[test]
fn test_table_snapshot() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), Some("v2"), 0, 1),
]);
let snap = table.snapshot();
assert_eq!(snap.len(), 2);
assert_eq!(snap.get(&Bytes::from("k1")), Some(&Bytes::from("v1")));
}
#[test]
fn test_table_debug() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), None, 0, 1),
]);
let debug = format!("{table:?}");
assert!(debug.contains("len: 1"));
assert!(debug.contains("records_processed: 2"));
assert!(debug.contains("tombstones_processed: 1"));
}
#[test]
fn test_table_clear() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), None, 0, 1),
]);
assert_eq!(table.len(), 1);
assert_eq!(table.records_processed(), 2);
assert_eq!(table.tombstones_processed(), 1);
table.clear();
assert!(table.is_empty());
assert_eq!(table.records_processed(), 0);
assert_eq!(table.tombstones_processed(), 0);
}
#[test]
fn test_table_into_iterator() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("a"), Some("1"), 0, 0),
make_record(Some("b"), Some("2"), 0, 1),
]);
let items: HashMap<&Bytes, &Bytes> = (&table).into_iter().collect();
assert_eq!(items.len(), 2);
assert_eq!(items[&Bytes::from("a")], &Bytes::from("1"));
}
#[test]
fn test_table_change_classification() {
let insert = TableChange {
key: Bytes::from("k"),
old_value: None,
new_value: Some(Bytes::from("v")),
partition: 0,
offset: 0,
timestamp: 0,
};
assert!(insert.is_insert());
assert!(!insert.is_update());
assert!(!insert.is_delete());
let update = TableChange {
key: Bytes::from("k"),
old_value: Some(Bytes::from("old")),
new_value: Some(Bytes::from("new")),
partition: 0,
offset: 1,
timestamp: 1000,
};
assert!(!update.is_insert());
assert!(update.is_update());
assert!(!update.is_delete());
let delete = TableChange {
key: Bytes::from("k"),
old_value: Some(Bytes::from("v")),
new_value: None,
partition: 0,
offset: 2,
timestamp: 2000,
};
assert!(!delete.is_insert());
assert!(!delete.is_update());
assert!(delete.is_delete());
}
#[test]
fn test_table_keys() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("a"), Some("1"), 0, 0),
make_record(Some("b"), Some("2"), 0, 1),
]);
let mut keys: Vec<&Bytes> = table.keys().collect();
keys.sort();
assert_eq!(keys, vec![&Bytes::from("a"), &Bytes::from("b")]);
}
#[test]
fn test_table_values() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("a"), Some("1"), 0, 0),
make_record(Some("b"), Some("2"), 0, 1),
]);
let mut values: Vec<&Bytes> = table.values().collect();
values.sort();
assert_eq!(values, vec![&Bytes::from("1"), &Bytes::from("2")]);
}
#[test]
fn test_table_owned_into_iterator() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("a"), Some("1"), 0, 0),
make_record(Some("b"), Some("2"), 0, 1),
]);
let items: HashMap<Bytes, Bytes> = table.into_iter().collect();
assert_eq!(items.len(), 2);
assert_eq!(items.get(&Bytes::from("a")), Some(&Bytes::from("1")));
assert_eq!(items.get(&Bytes::from("b")), Some(&Bytes::from("2")));
}
#[test]
fn test_table_clone_preserves_state() {
let mut table = CompactedTable::new();
table.ingest(&[
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), Some("v2"), 0, 1),
make_record(Some("k3"), None, 0, 2), ]);
let cloned = table.clone();
assert_eq!(cloned.len(), table.len());
assert_eq!(cloned.get(b"k1"), table.get(b"k1"));
assert_eq!(cloned.get(b"k2"), table.get(b"k2"));
assert_eq!(cloned.records_processed(), table.records_processed());
assert_eq!(cloned.tombstones_processed(), table.tombstones_processed());
}
#[test]
fn test_table_ingest() {
let mut table = CompactedTable::new();
let records = vec![
make_record(Some("k1"), Some("v1"), 0, 0),
make_record(Some("k2"), Some("v2"), 0, 1),
make_record(None, Some("no-key"), 0, 2),
make_record(Some("k1"), None, 0, 3), ];
table.ingest(&records);
assert_eq!(table.len(), 1);
assert!(!table.contains_key(b"k1"));
assert_eq!(table.get(b"k2"), Some(&Bytes::from("v2")));
assert_eq!(table.records_processed(), 4);
assert_eq!(table.tombstones_processed(), 1);
}
#[test]
fn test_table_ingest_matches_apply_state() {
let records = vec![
make_record(Some("a"), Some("1"), 0, 0),
make_record(Some("b"), Some("2"), 1, 0),
make_record(Some("a"), Some("3"), 0, 1),
make_record(Some("b"), None, 1, 1),
];
let mut via_apply = CompactedTable::new();
let _ = via_apply.apply(&records);
let mut via_ingest = CompactedTable::new();
via_ingest.ingest(&records);
assert_eq!(via_apply, via_ingest);
assert_eq!(
via_apply.records_processed(),
via_ingest.records_processed()
);
assert_eq!(
via_apply.tombstones_processed(),
via_ingest.tombstones_processed()
);
}
#[test]
fn test_table_equality_ignores_counters() {
let mut t1 = CompactedTable::new();
t1.ingest(&[make_record(Some("k"), Some("v"), 0, 0)]);
let mut t2 = CompactedTable::new();
t2.ingest(&[
make_record(None, Some("noise"), 0, 0), make_record(Some("k"), Some("v"), 0, 1),
]);
assert_eq!(t1, t2);
assert_ne!(t1.records_processed(), t2.records_processed());
}
#[test]
fn test_table_same_key_lifecycle_in_single_batch() {
let mut table = CompactedTable::new();
let records = vec![
make_record(Some("x"), Some("v1"), 0, 0), make_record(Some("x"), Some("v2"), 0, 1), make_record(Some("x"), None, 0, 2), make_record(Some("x"), Some("v3"), 0, 3), ];
let changes = table.apply(&records);
assert_eq!(table.len(), 1);
assert_eq!(table.get(b"x"), Some(&Bytes::from("v3")));
assert_eq!(changes.len(), 4);
assert!(changes[0].is_insert());
assert_eq!(changes[0].old_value, None);
assert_eq!(changes[0].new_value, Some(Bytes::from("v1")));
assert!(changes[1].is_update());
assert_eq!(changes[1].old_value, Some(Bytes::from("v1")));
assert_eq!(changes[1].new_value, Some(Bytes::from("v2")));
assert!(changes[2].is_delete());
assert_eq!(changes[2].old_value, Some(Bytes::from("v2")));
assert!(changes[3].is_insert());
assert_eq!(changes[3].old_value, None);
assert_eq!(changes[3].new_value, Some(Bytes::from("v3")));
assert_eq!(table.records_processed(), 4);
assert_eq!(table.tombstones_processed(), 1);
}
#[test]
fn test_all_public_types_are_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<CompactedTable>();
assert_send_sync::<TableChange>();
assert_send_sync::<CompactedTopicConsumer>();
assert_send_sync::<CompactedTopicConsumerBuilder>();
}
#[tokio::test]
async fn test_builder_missing_bootstrap_servers() {
let result = CompactedTopicConsumerBuilder::default()
.topic("test")
.build()
.await;
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("bootstrap_servers")
);
}
#[tokio::test]
async fn test_builder_missing_topic() {
let result = CompactedTopicConsumerBuilder::default()
.bootstrap_servers("localhost:9092")
.build()
.await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("topic"));
}
}