use std::sync::Arc;
use bytes::Bytes;
use nisshi_sans_io::{ErrorCode, create_topics_request::CreatableTopic, record::deflated::Batch};
use slatedb::{Db, object_store::memory::InMemory};
use url::Url;
use crate::{
BrokerRegistrationRequest, Error, Storage, Topition, TxnAddPartitionsRequest,
TxnAddPartitionsResponse,
};
use super::engine::Engine;
async fn create_test_engine() -> Engine {
let object_store = Arc::new(InMemory::new());
let db = Db::open("test.slatedb", object_store)
.await
.expect("Failed to open SlateDB");
Engine::new(
"test-cluster",
1,
Url::parse("tcp://localhost:9092").unwrap(),
Arc::new(db),
)
}
#[tokio::test]
async fn test_create_duplicate_topic() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("dup-topic".into())
.num_partitions(1)
.replication_factor(1);
let result = engine.create_topic(topic.clone(), false).await;
assert!(result.is_ok());
let result = engine.create_topic(topic, false).await;
assert!(matches!(
result,
Err(Error::Api(ErrorCode::TopicAlreadyExists))
));
}
#[tokio::test]
async fn test_offset_stage() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("stage-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("stage-topic", 0);
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(0, stage.log_start);
assert_eq!(0, stage.last_stable);
assert_eq!(0, stage.high_watermark);
}
#[tokio::test]
async fn test_brokers() {
let engine = create_test_engine().await;
let brokers = engine.brokers().await.unwrap();
assert_eq!(1, brokers.len());
assert_eq!(1, brokers[0].broker_id);
assert_eq!("localhost", brokers[0].host.as_str());
assert_eq!(9092, brokers[0].port);
}
#[tokio::test]
async fn test_cluster_id() {
let engine = create_test_engine().await;
let cluster_id = engine.cluster_id().await.unwrap();
assert_eq!("test-cluster", cluster_id);
}
#[tokio::test]
async fn test_node() {
let engine = create_test_engine().await;
let node = engine.node().await.unwrap();
assert_eq!(1, node);
}
#[tokio::test]
async fn test_txn_add_partitions_unknown_txn() {
use nisshi_sans_io::add_partitions_to_txn_request::AddPartitionsToTxnTopic;
let engine = create_test_engine().await;
let request = TxnAddPartitionsRequest::VersionZeroToThree {
transaction_id: "unknown-txn".into(),
producer_id: 1,
producer_epoch: 0,
topics: vec![
AddPartitionsToTxnTopic::default()
.name("any-topic".into())
.partitions(Some(vec![0])),
],
};
let response = engine.txn_add_partitions(request).await.unwrap();
match response {
TxnAddPartitionsResponse::VersionZeroToThree(results) => {
let partitions = results[0].results_by_partition.as_ref().unwrap();
assert_eq!(
ErrorCode::TransactionalIdNotFound,
ErrorCode::try_from(partitions[0].partition_error_code).unwrap()
);
}
_ => panic!("Expected VersionZeroToThree response"),
}
}
#[tokio::test]
async fn test_fetch_isolation_levels() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("isolation-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("isolation-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
record_count: 0,
record_data: Bytes::new(),
};
let _ = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
let _ = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(2, stage.high_watermark);
assert_eq!(2, stage.last_stable);
let stage_uncommitted = engine.offset_stage(&topition).await.unwrap();
assert_eq!(
stage_uncommitted.high_watermark,
stage_uncommitted.last_stable
);
}
#[tokio::test]
async fn test_offset_stage_with_transaction() {
use nisshi_sans_io::add_partitions_to_txn_request::AddPartitionsToTxnTopic;
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("stage-txn-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("stage-txn-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
record_count: 1,
record_data: Bytes::new(),
};
let _ = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(1, stage.high_watermark);
assert_eq!(1, stage.last_stable);
let producer = engine
.init_producer(Some("stage-test-txn"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let request = TxnAddPartitionsRequest::VersionZeroToThree {
transaction_id: "stage-test-txn".into(),
producer_id: producer.id,
producer_epoch: producer.epoch,
topics: vec![
AddPartitionsToTxnTopic::default()
.name("stage-txn-topic".into())
.partitions(Some(vec![0])),
],
};
let _ = engine.txn_add_partitions(request).await.unwrap();
let _ = engine
.txn_end("stage-test-txn", producer.id, producer.epoch, true)
.await
.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(2, stage.high_watermark);
assert_eq!(2, stage.last_stable);
}
#[tokio::test]
async fn test_idempotent_produce() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("idempotent-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let producer = engine
.init_producer(None, 60000, Some(-1), Some(-1))
.await
.unwrap();
let topition = Topition::new("idempotent-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: producer.id,
producer_epoch: producer.epoch,
base_sequence: 0,
record_count: 1,
record_data: Bytes::new(),
};
let offset = engine.produce(None, &topition, batch).await.unwrap();
assert_eq!(0, offset);
}
#[tokio::test]
async fn test_delete_records() {
use nisshi_sans_io::delete_records_request::{DeleteRecordsPartition, DeleteRecordsTopic};
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("delete-records-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("delete-records-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
record_count: 1,
record_data: Bytes::new(),
};
for _ in 0..5 {
let _ = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
}
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(0, stage.log_start);
assert_eq!(5, stage.high_watermark);
let delete_request = vec![
DeleteRecordsTopic::default()
.name("delete-records-topic".into())
.partitions(Some(vec![
DeleteRecordsPartition::default()
.partition_index(0)
.offset(3),
])),
];
let results = engine.delete_records(&delete_request).await.unwrap();
assert_eq!(1, results.len());
let partitions = results[0].partitions.as_ref().unwrap();
assert_eq!(1, partitions.len());
assert_eq!(
ErrorCode::None,
ErrorCode::try_from(partitions[0].error_code).unwrap()
);
assert_eq!(3, partitions[0].low_watermark);
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(3, stage.log_start);
}
#[tokio::test]
async fn test_fetch_with_min_bytes() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("min-bytes-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("min-bytes-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
record_count: 0,
record_data: Bytes::new(),
};
for _ in 0..5 {
let _ = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
}
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(5, stage.high_watermark);
}
#[tokio::test]
async fn test_register_broker() {
let engine = create_test_engine().await;
let registration = BrokerRegistrationRequest {
cluster_id: "test-cluster".into(),
broker_id: 1,
rack: Some("rack-1".into()),
incarnation_id: Default::default(),
};
engine.register_broker(registration).await.unwrap();
let brokers = engine.brokers().await.unwrap();
assert_eq!(1, brokers.len());
assert_eq!(1, brokers[0].broker_id);
assert_eq!(Some("rack-1".to_string()), brokers[0].rack);
}
#[tokio::test]
async fn test_txn_add_partitions_version_four_plus() {
use nisshi_sans_io::add_partitions_to_txn_request::{
AddPartitionsToTxnTopic, AddPartitionsToTxnTransaction,
};
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("v4-topic".into())
.num_partitions(2)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let producer = engine
.init_producer(Some("v4-txn"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let request = TxnAddPartitionsRequest::VersionFourPlus {
transactions: vec![
AddPartitionsToTxnTransaction::default()
.transactional_id("v4-txn".into())
.producer_id(producer.id)
.producer_epoch(producer.epoch)
.verify_only(false)
.topics(Some(vec![
AddPartitionsToTxnTopic::default()
.name("v4-topic".into())
.partitions(Some(vec![0, 1])),
])),
],
};
let response = engine.txn_add_partitions(request).await.unwrap();
match response {
TxnAddPartitionsResponse::VersionFourPlus(results) => {
assert_eq!(1, results.len());
assert_eq!("v4-txn", results[0].transactional_id.as_str());
let topic_results = results[0].topic_results.as_ref().unwrap();
assert_eq!(1, topic_results.len());
assert_eq!("v4-topic", topic_results[0].name.as_str());
let partitions = topic_results[0].results_by_partition.as_ref().unwrap();
assert_eq!(2, partitions.len());
}
_ => panic!("Expected VersionFourPlus response"),
}
}
#[tokio::test]
async fn test_txn_wrong_producer_id() {
use nisshi_sans_io::add_partitions_to_txn_request::AddPartitionsToTxnTopic;
let engine = create_test_engine().await;
let _ = engine
.init_producer(Some("wrong-id-txn"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let request = TxnAddPartitionsRequest::VersionZeroToThree {
transaction_id: "wrong-id-txn".into(),
producer_id: 9999, producer_epoch: 0,
topics: vec![
AddPartitionsToTxnTopic::default()
.name("any-topic".into())
.partitions(Some(vec![0])),
],
};
let response = engine.txn_add_partitions(request).await.unwrap();
match response {
TxnAddPartitionsResponse::VersionZeroToThree(results) => {
let partitions = results[0].results_by_partition.as_ref().unwrap();
assert_eq!(
ErrorCode::UnknownProducerId,
ErrorCode::try_from(partitions[0].partition_error_code).unwrap()
);
}
_ => panic!("Expected VersionZeroToThree response"),
}
}
#[tokio::test]
async fn test_txn_wrong_epoch() {
use nisshi_sans_io::add_partitions_to_txn_request::AddPartitionsToTxnTopic;
let engine = create_test_engine().await;
let producer = engine
.init_producer(Some("wrong-epoch-txn"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let request = TxnAddPartitionsRequest::VersionZeroToThree {
transaction_id: "wrong-epoch-txn".into(),
producer_id: producer.id,
producer_epoch: 99, topics: vec![
AddPartitionsToTxnTopic::default()
.name("any-topic".into())
.partitions(Some(vec![0])),
],
};
let response = engine.txn_add_partitions(request).await.unwrap();
match response {
TxnAddPartitionsResponse::VersionZeroToThree(results) => {
let partitions = results[0].results_by_partition.as_ref().unwrap();
assert_eq!(
ErrorCode::ProducerFenced,
ErrorCode::try_from(partitions[0].partition_error_code).unwrap()
);
}
_ => panic!("Expected VersionZeroToThree response"),
}
}
#[tokio::test]
async fn test_txn_end_unknown_transaction() {
let engine = create_test_engine().await;
let result = engine.txn_end("nonexistent-txn", 1, 0, true).await;
assert!(matches!(
result,
Err(Error::Api(ErrorCode::TransactionalIdNotFound))
));
}
#[tokio::test]
async fn test_txn_end_wrong_producer() {
let engine = create_test_engine().await;
let producer = engine
.init_producer(Some("end-wrong-prod"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let result = engine
.txn_end("end-wrong-prod", 9999, producer.epoch, true)
.await;
assert!(matches!(
result,
Err(Error::Api(ErrorCode::UnknownProducerId))
));
}
#[tokio::test]
async fn test_txn_end_wrong_epoch() {
let engine = create_test_engine().await;
let producer = engine
.init_producer(Some("end-wrong-epoch"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let result = engine
.txn_end("end-wrong-epoch", producer.id, 99, true)
.await;
assert!(matches!(result, Err(Error::Api(ErrorCode::ProducerFenced))));
}
#[tokio::test]
async fn test_idempotent_unknown_producer() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("unknown-prod-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("unknown-prod-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: 9999, producer_epoch: 0,
base_sequence: 0,
record_count: 0,
record_data: Bytes::new(),
};
let result = engine.produce(None, &topition, batch).await;
assert!(matches!(
result,
Err(Error::Api(ErrorCode::UnknownProducerId))
));
}
#[tokio::test]
async fn test_delete_records_unknown_topic() {
use nisshi_sans_io::delete_records_request::{DeleteRecordsPartition, DeleteRecordsTopic};
let engine = create_test_engine().await;
let delete_request = vec![
DeleteRecordsTopic::default()
.name("nonexistent-topic".into())
.partitions(Some(vec![
DeleteRecordsPartition::default()
.partition_index(0)
.offset(5),
])),
];
let results = engine.delete_records(&delete_request).await.unwrap();
assert_eq!(1, results.len());
let partitions = results[0].partitions.as_ref().unwrap();
assert_eq!(
ErrorCode::UnknownTopicOrPartition,
ErrorCode::try_from(partitions[0].error_code).unwrap()
);
}
#[tokio::test]
async fn test_delete_records_unknown_partition() {
use nisshi_sans_io::delete_records_request::{DeleteRecordsPartition, DeleteRecordsTopic};
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("del-unknown-part".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let delete_request = vec![
DeleteRecordsTopic::default()
.name("del-unknown-part".into())
.partitions(Some(vec![
DeleteRecordsPartition::default()
.partition_index(99) .offset(5),
])),
];
let results = engine.delete_records(&delete_request).await.unwrap();
assert_eq!(1, results.len());
let partitions = results[0].partitions.as_ref().unwrap();
assert_eq!(
ErrorCode::UnknownTopicOrPartition,
ErrorCode::try_from(partitions[0].error_code).unwrap()
);
}
#[tokio::test]
async fn test_produce_multiple_partitions() {
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("multi-part-topic".into())
.num_partitions(3)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
record_count: 0,
record_data: Bytes::new(),
};
for partition in 0..3 {
let topition = Topition::new("multi-part-topic", partition);
let offset = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
assert_eq!(0, offset); }
for partition in 0..3 {
let topition = Topition::new("multi-part-topic", partition);
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(1, stage.high_watermark);
}
}
#[tokio::test]
async fn test_txn_add_offsets() {
let engine = create_test_engine().await;
let producer = engine
.init_producer(Some("add-offsets-txn"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let error_code = engine
.txn_add_offsets("add-offsets-txn", producer.id, producer.epoch, "group-1")
.await
.unwrap();
assert_eq!(ErrorCode::None, error_code);
}
#[tokio::test]
async fn test_txn_add_offsets_unknown_txn() {
let engine = create_test_engine().await;
let error_code = engine
.txn_add_offsets("unknown-txn", 1, 0, "group-1")
.await
.unwrap();
assert_eq!(ErrorCode::TransactionalIdNotFound, error_code);
}
#[tokio::test]
async fn test_txn_add_offsets_wrong_producer() {
let engine = create_test_engine().await;
let producer = engine
.init_producer(Some("add-offsets-wrong-prod"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let error_code = engine
.txn_add_offsets("add-offsets-wrong-prod", 9999, producer.epoch, "group-1")
.await
.unwrap();
assert_eq!(ErrorCode::UnknownProducerId, error_code);
}
#[tokio::test]
async fn test_txn_add_offsets_wrong_epoch() {
let engine = create_test_engine().await;
let producer = engine
.init_producer(Some("add-offsets-wrong-epoch"), 60000, Some(-1), Some(-1))
.await
.unwrap();
let error_code = engine
.txn_add_offsets("add-offsets-wrong-epoch", producer.id, 99, "group-1")
.await
.unwrap();
assert_eq!(ErrorCode::ProducerFenced, error_code);
}
#[tokio::test]
async fn test_scram_credential_lifecycle() {
use nisshi_sans_io::ScramMechanism;
use crate::ScramCredential;
let engine = create_test_engine().await;
let found = engine
.user_scram_credential("alice", ScramMechanism::Scram256)
.await
.unwrap();
assert!(found.is_none());
let credential = ScramCredential {
salt: Bytes::from_static(b"salt"),
iterations: 4096,
stored_key: Bytes::from_static(b"stored-key"),
server_key: Bytes::from_static(b"server-key"),
};
engine
.upsert_user_scram_credential("alice", ScramMechanism::Scram256, credential.clone())
.await
.unwrap();
let found = engine
.user_scram_credential("alice", ScramMechanism::Scram256)
.await
.unwrap();
assert_eq!(Some(credential.clone()), found);
assert!(
engine
.user_scram_credential("alice", ScramMechanism::Scram512)
.await
.unwrap()
.is_none()
);
assert!(
engine
.user_scram_credential("bob", ScramMechanism::Scram256)
.await
.unwrap()
.is_none()
);
engine
.delete_user_scram_credential("alice", ScramMechanism::Scram256)
.await
.unwrap();
let found = engine
.user_scram_credential("alice", ScramMechanism::Scram256)
.await
.unwrap();
assert!(found.is_none());
}
#[tokio::test]
async fn test_list_offsets_earliest_after_delete_records() {
use nisshi_sans_io::delete_records_request::{DeleteRecordsPartition, DeleteRecordsTopic};
use nisshi_sans_io::{IsolationLevel, ListOffset};
let engine = create_test_engine().await;
let topic = CreatableTopic::default()
.name("earliest-topic".into())
.num_partitions(1)
.replication_factor(1);
let _ = engine.create_topic(topic, false).await.unwrap();
let topition = Topition::new("earliest-topic", 0);
let batch = Batch {
base_offset: 0,
batch_length: 0,
partition_leader_epoch: 0,
magic: 2,
crc: 0,
attributes: 0,
last_offset_delta: 0,
base_timestamp: 1000,
max_timestamp: 1000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
record_count: 1,
record_data: Bytes::new(),
};
for _ in 0..5 {
let _ = engine
.produce(None, &topition, batch.clone())
.await
.unwrap();
}
let responses = engine
.list_offsets(
IsolationLevel::ReadUncommitted,
&[(topition.clone(), ListOffset::Earliest)],
)
.await
.unwrap();
assert_eq!(Some(0), responses[0].1.offset);
let delete_request = vec![
DeleteRecordsTopic::default()
.name("earliest-topic".into())
.partitions(Some(vec![
DeleteRecordsPartition::default()
.partition_index(0)
.offset(3),
])),
];
let _ = engine.delete_records(&delete_request).await.unwrap();
let responses = engine
.list_offsets(
IsolationLevel::ReadUncommitted,
&[
(topition.clone(), ListOffset::Earliest),
(topition.clone(), ListOffset::Latest),
],
)
.await
.unwrap();
assert_eq!(ErrorCode::None, responses[0].1.error_code);
assert_eq!(Some(3), responses[0].1.offset);
assert_eq!(Some(5), responses[1].1.offset);
}
mod cleanup_policy {
use std::time::{Duration, SystemTime};
use nisshi_sans_io::{
IsolationLevel,
create_topics_request::CreatableTopicConfig,
record::{Record, inflated},
};
use super::*;
async fn topic_with_policy(engine: &Engine, name: &str, configs: &[(&str, &str)]) -> Topition {
let topic = CreatableTopic::default()
.name(name.into())
.num_partitions(1)
.replication_factor(1)
.configs(Some(
configs
.iter()
.map(|(name, value)| {
CreatableTopicConfig::default()
.name((*name).into())
.value(Some((*value).into()))
})
.collect(),
));
let _ = engine.create_topic(topic, false).await.unwrap();
Topition::new(name, 0)
}
fn keyed_batch(key: &'static [u8], value: &'static [u8], timestamp: Option<i64>) -> Batch {
let mut builder = inflated::Batch::builder().record(
Record::builder()
.key(Some(Bytes::from_static(key)))
.value(Some(Bytes::from_static(value))),
);
if let Some(timestamp) = timestamp {
builder = builder.base_timestamp(timestamp).max_timestamp(timestamp);
}
builder.build().and_then(Batch::try_from).unwrap()
}
async fn fetch_all(engine: &Engine, topition: &Topition) -> Vec<Batch> {
engine
.fetch(
topition,
0,
1,
1024 * 1024,
IsolationLevel::ReadUncommitted,
Duration::ZERO,
)
.await
.unwrap()
}
fn now_millis() -> i64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|since_epoch| since_epoch.as_millis() as i64)
.unwrap()
}
#[tokio::test]
async fn compact_keeps_latest_record_per_key() {
let engine = create_test_engine().await;
let topition =
topic_with_policy(&engine, "policy-compact", &[("cleanup.policy", "compact")]).await;
for batch in [
keyed_batch(b"a", b"one", None),
keyed_batch(b"b", b"first", None),
keyed_batch(b"a", b"two", None),
] {
let _ = engine.produce(None, &topition, batch).await.unwrap();
}
engine.maintain(SystemTime::now()).await.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(1, stage.log_start);
assert_eq!(3, stage.high_watermark);
let batches = fetch_all(&engine, &topition).await;
assert_eq!(2, batches.len());
assert_eq!(1, batches[0].base_offset);
assert_eq!(2, batches[1].base_offset);
let records = inflated::Batch::try_from(batches[1].clone())
.unwrap()
.records;
assert_eq!(1, records.len());
assert_eq!(Some(Bytes::from_static(b"two")), records[0].value);
engine.maintain(SystemTime::now()).await.unwrap();
assert_eq!(2, fetch_all(&engine, &topition).await.len());
}
#[tokio::test]
async fn compact_rewrites_partial_batch() {
let engine = create_test_engine().await;
let topition = topic_with_policy(
&engine,
"policy-compact-partial",
&[("cleanup.policy", "compact")],
)
.await;
let batch = inflated::Batch::builder()
.record(
Record::builder()
.key(Some(Bytes::from_static(b"a")))
.value(Some(Bytes::from_static(b"one"))),
)
.record(
Record::builder()
.key(Some(Bytes::from_static(b"b")))
.value(Some(Bytes::from_static(b"first")))
.offset_delta(1),
)
.last_offset_delta(1)
.build()
.and_then(Batch::try_from)
.unwrap();
let _ = engine.produce(None, &topition, batch).await.unwrap();
let _ = engine
.produce(None, &topition, keyed_batch(b"a", b"two", None))
.await
.unwrap();
engine.maintain(SystemTime::now()).await.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(0, stage.log_start);
assert_eq!(3, stage.high_watermark);
let batches = fetch_all(&engine, &topition).await;
assert_eq!(2, batches.len());
assert_eq!(0, batches[0].base_offset);
let records = inflated::Batch::try_from(batches[0].clone())
.unwrap()
.records;
assert_eq!(1, records.len());
assert_eq!(Some(Bytes::from_static(b"b")), records[0].key);
assert_eq!(Some(Bytes::from_static(b"first")), records[0].value);
assert_eq!(1, records[0].offset_delta);
}
#[tokio::test]
async fn delete_removes_expired_prefix() {
let engine = create_test_engine().await;
let topition = topic_with_policy(
&engine,
"policy-delete",
&[("cleanup.policy", "delete"), ("retention.ms", "60000")],
)
.await;
let now = now_millis();
let expired = now - Duration::from_mins(5).as_millis() as i64;
for batch in [
keyed_batch(b"a", b"one", Some(expired)),
keyed_batch(b"a", b"two", Some(expired + 1)),
keyed_batch(b"a", b"three", Some(now)),
] {
let _ = engine.produce(None, &topition, batch).await.unwrap();
}
engine.maintain(SystemTime::now()).await.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(2, stage.log_start);
assert_eq!(3, stage.high_watermark);
let batches = fetch_all(&engine, &topition).await;
assert_eq!(1, batches.len());
assert_eq!(2, batches[0].base_offset);
}
#[tokio::test]
async fn delete_whole_partition_meets_high_watermark() {
use nisshi_sans_io::ListOffset;
let engine = create_test_engine().await;
let topition = topic_with_policy(
&engine,
"policy-delete-all",
&[("cleanup.policy", "delete"), ("retention.ms", "60000")],
)
.await;
let expired = now_millis() - Duration::from_mins(5).as_millis() as i64;
for batch in [
keyed_batch(b"a", b"one", Some(expired)),
keyed_batch(b"a", b"two", Some(expired + 1)),
] {
let _ = engine.produce(None, &topition, batch).await.unwrap();
}
engine.maintain(SystemTime::now()).await.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(2, stage.log_start);
assert_eq!(2, stage.high_watermark);
assert!(fetch_all(&engine, &topition).await.is_empty());
let responses = engine
.list_offsets(
IsolationLevel::ReadUncommitted,
&[(topition.clone(), ListOffset::Earliest)],
)
.await
.unwrap();
assert_eq!(Some(2), responses[0].1.offset);
let offset = engine
.produce(None, &topition, keyed_batch(b"a", b"three", None))
.await
.unwrap();
assert_eq!(2, offset);
}
#[tokio::test]
async fn no_policy_is_untouched() {
let engine = create_test_engine().await;
let topition = topic_with_policy(&engine, "policy-none", &[]).await;
let expired = now_millis() - Duration::from_hours(30 * 24).as_millis() as i64;
for batch in [
keyed_batch(b"a", b"one", Some(expired)),
keyed_batch(b"a", b"two", Some(expired + 1)),
] {
let _ = engine.produce(None, &topition, batch).await.unwrap();
}
engine.maintain(SystemTime::now()).await.unwrap();
let stage = engine.offset_stage(&topition).await.unwrap();
assert_eq!(0, stage.log_start);
assert_eq!(2, stage.high_watermark);
assert_eq!(2, fetch_all(&engine, &topition).await.len());
}
}
#[tokio::test]
async fn test_builder_pattern() {
let object_store = Arc::new(InMemory::new());
let db = Db::open("builder-test.slatedb", object_store)
.await
.expect("Failed to open SlateDB");
let engine = Engine::builder()
.cluster("builder-cluster")
.node(42)
.advertised_listener(Url::parse("tcp://10.0.0.1:9093").unwrap())
.db(Arc::new(db))
.schemas(None)
.lake(None)
.build();
assert_eq!("builder-cluster", engine.cluster_id().await.unwrap());
assert_eq!(42, engine.node().await.unwrap());
}