#![allow(clippy::unused_async)]
use crate::cx::Cx;
#[cfg(feature = "kafka")]
use crate::messaging::kafka::apply_security_config;
use crate::messaging::kafka::{
KafkaError, KafkaSaslConfig, KafkaSecurityConfig, KafkaTlsConfig, is_loopback_bootstrap_server,
};
#[cfg(all(test, not(feature = "kafka")))]
use crate::messaging::kafka::{
deterministic_broker_end_offset, deterministic_broker_fetch, deterministic_broker_notify,
};
use crate::sync::Notify;
#[cfg(any(feature = "kafka", all(test, not(feature = "kafka"))))]
use crate::time::Sleep;
use parking_lot::Mutex;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
#[cfg(any(feature = "kafka", all(test, not(feature = "kafka"))))]
use std::future::Future;
#[cfg(any(feature = "kafka", all(test, not(feature = "kafka"))))]
use std::pin::Pin;
#[cfg(any(test, feature = "kafka"))]
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(any(feature = "kafka", all(test, not(feature = "kafka"))))]
use std::task::Poll;
use std::time::Duration;
#[cfg(feature = "kafka")]
use rdkafka::{
consumer::{BaseConsumer, CommitMode, Consumer},
error::KafkaError as RdKafkaError,
message::{Headers, Message},
topic_partition_list::{Offset, TopicPartitionList},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AutoOffsetReset {
Earliest,
#[default]
Latest,
None,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IsolationLevel {
#[default]
ReadUncommitted,
ReadCommitted,
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub bootstrap_servers: Vec<String>,
pub group_id: String,
pub client_id: Option<String>,
pub session_timeout: Duration,
pub heartbeat_interval: Duration,
pub auto_offset_reset: AutoOffsetReset,
pub enable_auto_commit: bool,
pub auto_commit_interval: Duration,
pub max_poll_records: usize,
pub fetch_min_bytes: usize,
pub fetch_max_bytes: usize,
pub fetch_max_wait: Duration,
pub isolation_level: IsolationLevel,
pub security: KafkaSecurityConfig,
pub force_real_kafka: bool,
pub retries: u32,
allow_insecure_transport_for_testing: bool,
}
impl Default for ConsumerConfig {
fn default() -> Self {
Self {
bootstrap_servers: vec!["localhost:9092".to_string()],
group_id: "asupersync-default".to_string(),
client_id: None,
session_timeout: Duration::from_secs(45),
heartbeat_interval: Duration::from_secs(3),
auto_offset_reset: AutoOffsetReset::Latest,
enable_auto_commit: false,
auto_commit_interval: Duration::from_secs(5),
max_poll_records: 500,
fetch_min_bytes: 1,
fetch_max_bytes: 50 * 1024 * 1024,
fetch_max_wait: Duration::from_millis(500),
isolation_level: IsolationLevel::ReadUncommitted,
security: KafkaSecurityConfig::default(),
force_real_kafka: false,
retries: 3,
allow_insecure_transport_for_testing: false,
}
}
}
impl ConsumerConfig {
#[must_use]
pub fn new(bootstrap_servers: Vec<String>, group_id: impl Into<String>) -> Self {
Self {
bootstrap_servers,
group_id: group_id.into(),
..Default::default()
}
}
#[must_use]
pub fn client_id(mut self, client_id: &str) -> Self {
self.client_id = Some(client_id.to_string());
self
}
#[must_use]
pub fn session_timeout(mut self, timeout: Duration) -> Self {
self.session_timeout = timeout;
self
}
#[must_use]
pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
self.heartbeat_interval = interval;
self
}
#[must_use]
pub const fn auto_offset_reset(mut self, reset: AutoOffsetReset) -> Self {
self.auto_offset_reset = reset;
self
}
#[must_use]
pub const fn enable_auto_commit(mut self, enable: bool) -> Self {
self.enable_auto_commit = enable;
self
}
#[must_use]
pub fn auto_commit_interval(mut self, interval: Duration) -> Self {
self.auto_commit_interval = interval;
self
}
#[must_use]
pub const fn max_poll_records(mut self, max: usize) -> Self {
self.max_poll_records = max;
self
}
#[must_use]
pub const fn fetch_min_bytes(mut self, min: usize) -> Self {
self.fetch_min_bytes = min;
self
}
#[must_use]
pub const fn fetch_max_bytes(mut self, max: usize) -> Self {
self.fetch_max_bytes = max;
self
}
#[must_use]
pub fn fetch_max_wait(mut self, wait: Duration) -> Self {
self.fetch_max_wait = wait;
self
}
#[must_use]
pub const fn isolation_level(mut self, level: IsolationLevel) -> Self {
self.isolation_level = level;
self
}
#[must_use]
pub const fn force_real_kafka(mut self, force: bool) -> Self {
self.force_real_kafka = force;
self
}
#[must_use]
pub const fn retries(mut self, retries: u32) -> Self {
self.retries = retries;
self
}
#[must_use]
pub fn security(mut self, security: KafkaSecurityConfig) -> Self {
self.security = security;
self
}
#[must_use]
pub fn tls(self, tls: KafkaTlsConfig) -> Self {
self.security(KafkaSecurityConfig::Tls(tls))
}
#[must_use]
pub fn sasl_scram_sha_256(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.security(KafkaSecurityConfig::SaslSsl(
KafkaSaslConfig::scram_sha_256(username, password),
))
}
#[must_use]
pub fn sasl_scram_sha_512(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.security(KafkaSecurityConfig::SaslSsl(
KafkaSaslConfig::scram_sha_512(username, password),
))
}
#[cfg(any(test, debug_assertions))]
#[must_use]
pub const fn allow_insecure_transport_for_testing(mut self, allow: bool) -> Self {
self.allow_insecure_transport_for_testing = allow;
self
}
pub fn validate(&self) -> Result<(), KafkaError> {
if self.bootstrap_servers.is_empty() {
return Err(KafkaError::Config(
"bootstrap_servers cannot be empty".to_string(),
));
}
if self.group_id.trim().is_empty() {
return Err(KafkaError::Config("group_id cannot be empty".to_string()));
}
if self.max_poll_records == 0 {
return Err(KafkaError::Config(
"max_poll_records must be > 0".to_string(),
));
}
if self.fetch_min_bytes > self.fetch_max_bytes {
return Err(KafkaError::Config(
"fetch_min_bytes must be <= fetch_max_bytes".to_string(),
));
}
self.security.validate()?;
if !self.allow_insecure_transport_for_testing && !self.security.is_remote_secure() {
for server in &self.bootstrap_servers {
if !is_loopback_bootstrap_server(server) {
return Err(KafkaError::Config(format!(
"remote Kafka bootstrap server '{server}' is rejected by default: \
configure TLS or SASL_SSL SCRAM security for non-loopback brokers"
)));
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopicPartitionOffset {
pub topic: String,
pub partition: i32,
pub offset: i64,
}
impl TopicPartitionOffset {
#[must_use]
pub fn new(topic: impl Into<String>, partition: i32, offset: i64) -> Self {
Self {
topic: topic.into(),
partition,
offset,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RebalanceResult {
pub generation: u64,
pub assigned: Vec<(String, i32)>,
pub revoked: Vec<(String, i32)>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerRecord {
pub topic: String,
pub partition: i32,
pub offset: i64,
pub key: Option<Vec<u8>>,
pub payload: Vec<u8>,
pub timestamp: Option<i64>,
pub headers: Vec<(String, Vec<u8>)>,
}
#[cfg(any(feature = "kafka", all(test, not(feature = "kafka"))))]
fn duration_to_nanos(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
fn broker_operations_feature_disabled_for_build() -> bool {
#[cfg(all(not(test), not(feature = "kafka")))]
{
true
}
#[cfg(any(test, feature = "kafka"))]
{
false
}
}
#[cfg(feature = "kafka")]
const MAX_BROKER_POLL_SLICE: Duration = Duration::from_millis(50);
pub struct KafkaConsumer {
config: ConsumerConfig,
state: Mutex<ConsumerState>,
closed: AtomicBool,
state_notify: Notify,
#[cfg(feature = "kafka")]
consumer: Option<Arc<BaseConsumer>>,
#[cfg(feature = "kafka")]
broker_ops: Option<Arc<Mutex<()>>>,
#[cfg(feature = "kafka")]
buffered_outcome: Arc<Mutex<Option<Result<BrokerPollOutcome, KafkaError>>>>,
#[cfg(test)]
rebalance_after_open_hook: Mutex<Option<Arc<RebalanceAfterOpenHook>>>,
#[cfg(all(test, not(feature = "kafka")))]
poll_before_wait_hook: Mutex<Option<Arc<PollBeforeWaitHook>>>,
}
impl fmt::Debug for KafkaConsumer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("KafkaConsumer")
.field("config", &self.config)
.field("state", &self.state)
.field("closed", &self.closed.load(Ordering::Acquire))
.finish_non_exhaustive()
}
}
#[derive(Debug, Default)]
struct ConsumerState {
subscribed_topics: BTreeSet<String>,
assigned_partitions: BTreeSet<(String, i32)>,
committed_offsets: BTreeMap<(String, i32), i64>,
positions: BTreeMap<(String, i32), i64>,
#[cfg(all(test, not(feature = "kafka")))]
poll_cursor: usize,
rebalance_generation: u64,
last_revoked_partitions: BTreeSet<(String, i32)>,
}
#[cfg(test)]
#[derive(Debug)]
struct RebalanceAfterOpenHook {
arrived: std::sync::Barrier,
release: std::sync::Barrier,
}
#[cfg(all(test, not(feature = "kafka")))]
#[derive(Debug)]
struct PollBeforeWaitHook {
arrived: std::sync::Barrier,
release: std::sync::Barrier,
}
#[cfg(all(test, not(feature = "kafka")))]
impl PollBeforeWaitHook {
fn new() -> Self {
Self {
arrived: std::sync::Barrier::new(2),
release: std::sync::Barrier::new(2),
}
}
}
#[cfg(test)]
impl RebalanceAfterOpenHook {
fn new() -> Self {
Self {
arrived: std::sync::Barrier::new(2),
release: std::sync::Barrier::new(2),
}
}
}
#[cfg(feature = "kafka")]
#[derive(Debug, Default)]
struct BrokerSnapshot {
assigned_partitions: BTreeSet<(String, i32)>,
positions: BTreeMap<(String, i32), i64>,
}
#[cfg(feature = "kafka")]
#[derive(Debug)]
struct BrokerPollOutcome {
record: Option<ConsumerRecord>,
snapshot: BrokerSnapshot,
}
#[cfg(feature = "kafka")]
fn auto_offset_reset_str(reset: AutoOffsetReset) -> &'static str {
match reset {
AutoOffsetReset::Earliest => "earliest",
AutoOffsetReset::Latest => "latest",
AutoOffsetReset::None => "error",
}
}
#[cfg(feature = "kafka")]
fn isolation_level_str(level: IsolationLevel) -> &'static str {
match level {
IsolationLevel::ReadUncommitted => "read_uncommitted",
IsolationLevel::ReadCommitted => "read_committed",
}
}
#[cfg(feature = "kafka")]
fn duration_to_millis(duration: Duration) -> u64 {
duration.as_millis().min(u128::from(u64::MAX)) as u64
}
#[cfg(feature = "kafka")]
fn build_consumer_config(config: &ConsumerConfig) -> rdkafka::ClientConfig {
let mut client = rdkafka::ClientConfig::new();
client.set("bootstrap.servers", config.bootstrap_servers.join(","));
apply_security_config(&mut client, &config.security);
client.set("group.id", &config.group_id);
if let Some(client_id) = &config.client_id {
client.set("client.id", client_id);
}
client.set(
"session.timeout.ms",
duration_to_millis(config.session_timeout).to_string(),
);
client.set(
"heartbeat.interval.ms",
duration_to_millis(config.heartbeat_interval).to_string(),
);
client.set(
"auto.offset.reset",
auto_offset_reset_str(config.auto_offset_reset),
);
client.set("enable.auto.commit", config.enable_auto_commit.to_string());
client.set("enable.auto.offset.store", "false");
client.set(
"auto.commit.interval.ms",
duration_to_millis(config.auto_commit_interval).to_string(),
);
client.set("fetch.min.bytes", config.fetch_min_bytes.to_string());
client.set("fetch.max.bytes", config.fetch_max_bytes.to_string());
client.set(
"fetch.wait.max.ms",
duration_to_millis(config.fetch_max_wait).to_string(),
);
client.set(
"isolation.level",
isolation_level_str(config.isolation_level),
);
client.set("enable.partition.eof", "true");
client
}
#[cfg(feature = "kafka")]
fn map_consumer_error(err: RdKafkaError) -> KafkaError {
match err {
RdKafkaError::Canceled => KafkaError::Cancelled,
RdKafkaError::ClientConfig(_, desc, key, value) => {
KafkaError::Config(format!("{desc} (key: {key}, value: {value})"))
}
RdKafkaError::ClientCreation(msg) | RdKafkaError::Subscription(msg) => {
KafkaError::Config(msg)
}
_ => KafkaError::Broker(err.to_string()),
}
}
#[cfg(feature = "kafka")]
fn consumer_retry_backoff(config: &ConsumerConfig, attempt: u32) -> Duration {
let base_ms = config.heartbeat_interval.as_millis().max(1) as u64;
let exp = 1_u64 << attempt.min(6);
Duration::from_millis(base_ms.saturating_mul(exp).min(5000))
}
#[cfg(feature = "kafka")]
async fn wait_consumer_retry_backoff(cx: &Cx, delay: Duration) -> Result<(), KafkaError> {
if delay.is_zero() {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
crate::runtime::yield_now().await;
return cx.checkpoint().map_err(|_| KafkaError::Cancelled);
}
let now = cx
.timer_driver()
.map_or_else(crate::time::wall_now, |driver| driver.now());
let mut sleeper = crate::time::sleep(now, delay);
std::future::poll_fn(|task_cx| {
if cx.checkpoint().is_err() {
return std::task::Poll::Ready(Err(KafkaError::Cancelled));
}
std::pin::Pin::new(&mut sleeper)
.poll(task_cx)
.map(|()| Ok(()))
})
.await
}
#[cfg(feature = "kafka")]
async fn retry_consumer_operation<T, F>(
cx: &Cx,
config: &ConsumerConfig,
mut attempt_operation: F,
) -> Result<T, KafkaError>
where
F: FnMut() -> Result<T, KafkaError>,
{
let mut attempt = 0;
loop {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
match attempt_operation() {
Ok(value) => return Ok(value),
Err(err) if err.is_retryable() && attempt < config.retries => {
let delay = consumer_retry_backoff(config, attempt);
attempt = attempt.saturating_add(1);
wait_consumer_retry_backoff(cx, delay).await?;
}
Err(err) => return Err(err),
}
}
}
#[cfg(feature = "kafka")]
fn offset_from_rdkafka(offset: Offset) -> Option<i64> {
match offset {
Offset::Offset(value) if value >= 0 => Some(value),
_ => None,
}
}
#[cfg(feature = "kafka")]
fn broker_snapshot_from_topic_maps(
assigned: BTreeSet<(String, i32)>,
positions: BTreeMap<(String, i32), i64>,
) -> BrokerSnapshot {
BrokerSnapshot {
assigned_partitions: assigned,
positions,
}
}
#[cfg(feature = "kafka")]
fn capture_broker_snapshot(consumer: &BaseConsumer) -> Result<BrokerSnapshot, KafkaError> {
let assignment = consumer.assignment().map_err(map_consumer_error)?;
let assigned_partitions: BTreeSet<(String, i32)> =
assignment.to_topic_map().into_keys().collect();
let positions = if assigned_partitions.is_empty() {
BTreeMap::new()
} else {
consumer
.position()
.map_err(map_consumer_error)?
.to_topic_map()
.into_iter()
.filter_map(|(key, offset)| offset_from_rdkafka(offset).map(|offset| (key, offset)))
.collect()
};
Ok(broker_snapshot_from_topic_maps(
assigned_partitions,
positions,
))
}
#[cfg(feature = "kafka")]
fn consumer_record_from_message(message: &rdkafka::message::BorrowedMessage<'_>) -> ConsumerRecord {
let headers = message
.headers()
.map(|headers| {
(0..headers.count())
.map(|index| {
let header = headers.get(index);
(
header.key.to_string(),
header
.value
.map_or_else(Vec::new, std::borrow::ToOwned::to_owned),
)
})
.collect()
})
.unwrap_or_default();
ConsumerRecord {
topic: message.topic().to_string(),
partition: message.partition(),
offset: message.offset(),
key: message.key().map(std::borrow::ToOwned::to_owned),
payload: message
.payload()
.map_or_else(Vec::new, std::borrow::ToOwned::to_owned),
timestamp: message.timestamp().to_millis(),
headers,
}
}
#[cfg(feature = "kafka")]
fn apply_broker_snapshot(state: &mut ConsumerState, snapshot: BrokerSnapshot) {
let previous_assignments = state.assigned_partitions.clone();
if previous_assignments != snapshot.assigned_partitions {
state.rebalance_generation = state.rebalance_generation.saturating_add(1);
state.last_revoked_partitions = previous_assignments
.difference(&snapshot.assigned_partitions)
.cloned()
.collect();
}
state.assigned_partitions = snapshot.assigned_partitions;
state
.positions
.retain(|key, _| state.assigned_partitions.contains(key));
for (key, offset) in snapshot.positions {
if state.assigned_partitions.contains(&key) {
state.positions.insert(key, offset);
}
}
}
impl KafkaConsumer {
pub fn new(config: ConsumerConfig) -> Result<Self, KafkaError> {
config.validate()?;
#[cfg(feature = "kafka")]
let consumer = if cfg!(not(test)) || config.force_real_kafka {
Some(
build_consumer_config(&config)
.create::<BaseConsumer>()
.map_err(map_consumer_error)?,
)
} else {
None
};
#[cfg(feature = "kafka")]
let consumer = consumer.map(Arc::new);
#[cfg(feature = "kafka")]
let broker_ops = consumer.as_ref().map(|_| Arc::new(Mutex::new(())));
Ok(Self {
config,
state: Mutex::new(ConsumerState::default()),
closed: AtomicBool::new(false),
state_notify: Notify::new(),
#[cfg(feature = "kafka")]
consumer,
#[cfg(feature = "kafka")]
broker_ops,
#[cfg(feature = "kafka")]
buffered_outcome: Arc::new(Mutex::new(None)),
#[cfg(test)]
rebalance_after_open_hook: Mutex::new(None),
#[cfg(all(test, not(feature = "kafka")))]
poll_before_wait_hook: Mutex::new(None),
})
}
#[cfg(feature = "kafka")]
fn broker_backend(&self) -> Option<(Arc<BaseConsumer>, Arc<Mutex<()>>)> {
self.consumer
.as_ref()
.zip(self.broker_ops.as_ref())
.map(|(consumer, broker_ops)| (Arc::clone(consumer), Arc::clone(broker_ops)))
}
#[cfg(test)]
fn install_rebalance_after_open_hook(&self, hook: Arc<RebalanceAfterOpenHook>) {
*self.rebalance_after_open_hook.lock() = Some(hook);
}
#[cfg(all(test, not(feature = "kafka")))]
fn install_poll_before_wait_hook(&self, hook: Arc<PollBeforeWaitHook>) {
*self.poll_before_wait_hook.lock() = Some(hook);
}
#[allow(unused_variables)]
pub async fn subscribe(&self, cx: &Cx, topics: &[&str]) -> Result<(), KafkaError> {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
self.ensure_open()?;
if topics.is_empty() {
return Err(KafkaError::Config("topics cannot be empty".to_string()));
}
let mut normalized = BTreeSet::new();
for topic in topics {
let topic = topic.trim();
if topic.is_empty() {
return Err(KafkaError::Config("topic cannot be empty".to_string()));
}
normalized.insert(topic.to_string());
}
#[cfg(feature = "kafka")]
if let Some((consumer, broker_ops)) = self.broker_backend() {
let topic_list: Vec<String> = normalized.iter().cloned().collect();
crate::runtime::spawn_blocking::spawn_blocking_on_thread(move || {
let _guard = broker_ops.lock();
let topic_refs: Vec<&str> = topic_list.iter().map(String::as_str).collect();
consumer.subscribe(&topic_refs).map_err(map_consumer_error)
})
.await?;
}
if broker_operations_feature_disabled_for_build() {
return Err(KafkaError::FeatureDisabled);
}
let mut state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
state.subscribed_topics = normalized;
#[cfg(all(test, not(feature = "kafka")))]
{
state.assigned_partitions = state
.subscribed_topics
.iter()
.cloned()
.map(|topic| (topic, 0))
.collect();
}
#[cfg(feature = "kafka")]
{
if self.broker_backend().is_some() {
state.assigned_partitions.clear();
} else {
state.assigned_partitions = state
.subscribed_topics
.iter()
.cloned()
.map(|topic| (topic, 0))
.collect();
}
}
state.positions.clear();
state.rebalance_generation = 0;
state.last_revoked_partitions.clear();
drop(state);
self.state_notify.notify_waiters();
Ok(())
}
#[allow(clippy::too_many_lines)]
pub async fn rebalance(
&self,
cx: &Cx,
assignments: &[TopicPartitionOffset],
) -> Result<RebalanceResult, KafkaError> {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
self.ensure_open()?;
if broker_operations_feature_disabled_for_build() {
return Err(KafkaError::FeatureDisabled);
}
#[cfg(test)]
let rebalance_after_open_hook = self.rebalance_after_open_hook.lock().clone();
#[cfg(test)]
if let Some(hook) = rebalance_after_open_hook {
hook.arrived.wait();
hook.release.wait();
}
let mut normalized = BTreeMap::new();
let (next_assignments, assigned, revoked) = {
let state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
if state.subscribed_topics.is_empty() {
return Err(KafkaError::Config(
"consumer has no active topic subscription".to_string(),
));
}
for tpo in assignments {
if tpo.topic.trim().is_empty() {
return Err(KafkaError::Config("topic cannot be empty".to_string()));
}
validate_partition_number(tpo.partition)?;
if !state.subscribed_topics.contains(&tpo.topic) {
return Err(KafkaError::InvalidTopic(tpo.topic.clone()));
}
if tpo.offset < 0 {
return Err(KafkaError::Config(
"rebalance offsets must be non-negative".to_string(),
));
}
if normalized
.insert((tpo.topic.clone(), tpo.partition), tpo.offset)
.is_some()
{
return Err(KafkaError::Config(
"duplicate topic/partition entry in rebalance batch".to_string(),
));
}
}
let previous_assignments = state.assigned_partitions.clone();
let next_assignments: BTreeSet<(String, i32)> = normalized.keys().cloned().collect();
let revoked: Vec<(String, i32)> = previous_assignments
.difference(&next_assignments)
.cloned()
.collect();
let assigned: Vec<(String, i32)> = next_assignments.iter().cloned().collect();
drop(state);
(next_assignments, assigned, revoked)
};
#[cfg(feature = "kafka")]
if let Some((consumer, broker_ops)) = self.broker_backend() {
let assignment_list: Vec<TopicPartitionOffset> = assignments.to_vec();
crate::runtime::spawn_blocking::spawn_blocking_on_thread(move || {
let _guard = broker_ops.lock();
if assignment_list.is_empty() {
consumer.unassign().map_err(map_consumer_error)
} else {
let mut tpl = TopicPartitionList::new();
for tpo in &assignment_list {
tpl.add_partition_offset(
&tpo.topic,
tpo.partition,
Offset::Offset(tpo.offset),
)
.map_err(map_consumer_error)?;
}
consumer.assign(&tpl).map_err(map_consumer_error)
}
})
.await?;
}
let mut state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
state.assigned_partitions = next_assignments;
let retained_assignments = state.assigned_partitions.clone();
state
.positions
.retain(|key, _| retained_assignments.contains(key));
for (partition, offset) in normalized {
state.positions.insert(partition, offset);
}
state.rebalance_generation = state.rebalance_generation.saturating_add(1);
state.last_revoked_partitions = revoked.iter().cloned().collect();
let generation = state.rebalance_generation;
drop(state);
self.state_notify.notify_waiters();
Ok(RebalanceResult {
generation,
assigned,
revoked,
})
}
#[allow(unused_variables, clippy::too_many_lines)]
pub async fn poll(
&self,
cx: &Cx,
timeout: Duration,
) -> Result<Option<ConsumerRecord>, KafkaError> {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
self.ensure_open()?;
if broker_operations_feature_disabled_for_build() {
return Err(KafkaError::FeatureDisabled);
}
self.ensure_has_subscription()?;
#[cfg(feature = "kafka")]
{
if let Some((consumer, broker_ops)) = self.broker_backend() {
let auto_commit = self.config.enable_auto_commit;
let now_fn = || {
cx.timer_driver()
.map_or_else(crate::time::wall_now, |d| d.now())
};
let deadline = now_fn().saturating_add_nanos(duration_to_nanos(timeout));
let mut first_iteration = true;
loop {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
let now = now_fn();
if !first_iteration && now >= deadline {
return Ok(None);
}
let buffered_res = self.buffered_outcome.lock().take();
if let Some(res) = buffered_res {
match res {
Ok(outcome) => {
let dropped_record_for_revoked: bool = {
let mut state = self.state.lock();
apply_broker_snapshot(&mut state, outcome.snapshot);
if let Some(ref rec) = outcome.record {
let owned = state
.assigned_partitions
.contains(&(rec.topic.clone(), rec.partition));
!owned
} else {
false
}
};
if !dropped_record_for_revoked {
if let Some(record) = outcome.record {
return Ok(Some(record));
}
}
}
Err(e) => return Err(e),
}
if timeout.is_zero() {
return Ok(None);
}
first_iteration = false;
continue;
}
let wait_for = if timeout.is_zero() {
Duration::ZERO
} else {
let remaining = Duration::from_nanos(deadline.duration_since(now));
remaining.min(MAX_BROKER_POLL_SLICE)
};
let outcome_res = crate::runtime::spawn_blocking::spawn_blocking_on_thread({
let consumer = Arc::clone(&consumer);
let broker_ops = Arc::clone(&broker_ops);
let buffered_outcome = Arc::clone(&self.buffered_outcome);
move || -> Result<(), KafkaError> {
let _guard = broker_ops.lock();
let record = match consumer.poll(wait_for) {
Some(Ok(message)) => {
if auto_commit {
if let Err(e) = consumer
.store_offset_from_message(&message)
.map_err(map_consumer_error)
{
*buffered_outcome.lock() = Some(Err(e));
return Ok(());
}
}
Some(consumer_record_from_message(&message))
}
Some(Err(
RdKafkaError::NoMessageReceived | RdKafkaError::PartitionEOF(_),
))
| None => None,
Some(Err(err)) => {
*buffered_outcome.lock() = Some(Err(map_consumer_error(err)));
return Ok(());
}
};
let snapshot = match capture_broker_snapshot(&consumer) {
Ok(s) => s,
Err(e) => {
*buffered_outcome.lock() = Some(Err(e));
return Ok(());
}
};
*buffered_outcome.lock() =
Some(Ok(BrokerPollOutcome { record, snapshot }));
Ok(())
}
})
.await;
outcome_res?;
}
}
}
#[cfg(feature = "kafka")]
{
if timeout.is_zero() {
return Ok(None);
}
let now_fn = || {
cx.timer_driver()
.map_or_else(crate::time::wall_now, |d| d.now())
};
let deadline = now_fn().saturating_add_nanos(duration_to_nanos(timeout));
loop {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
let mut state_wait = self.state_notify.notified();
let mut sleep = Sleep::new(deadline);
self.ensure_open()?;
self.ensure_has_subscription()?;
if now_fn() >= deadline {
return Ok(None);
}
() = std::future::poll_fn(|task_cx| {
if crate::cx::Cx::with_current(|c| c.checkpoint().is_err()).unwrap_or(false) {
return Poll::Ready(());
}
if Pin::new(&mut sleep).poll(task_cx).is_ready() {
return Poll::Ready(());
}
if Pin::new(&mut state_wait).poll(task_cx).is_ready() {
return Poll::Ready(());
}
Poll::Pending
})
.await;
}
}
#[cfg(all(test, not(feature = "kafka")))]
{
if let Some(record) = self.try_poll_local_record()? {
return Ok(Some(record));
}
if timeout.is_zero() {
return Ok(None);
}
let now_fn = || {
cx.timer_driver()
.map_or_else(crate::time::wall_now, |d| d.now())
};
let deadline = now_fn().saturating_add_nanos(duration_to_nanos(timeout));
loop {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
#[cfg(all(test, not(feature = "kafka")))]
let poll_before_wait_hook = self.poll_before_wait_hook.lock().clone();
#[cfg(all(test, not(feature = "kafka")))]
if let Some(hook) = poll_before_wait_hook {
hook.arrived.wait();
hook.release.wait();
}
let mut state_wait = self.state_notify.notified();
let mut broker_wait = deterministic_broker_notify().notified();
let mut sleep = Sleep::new(deadline);
self.ensure_open()?;
self.ensure_has_subscription()?;
if let Some(record) = self.try_poll_local_record()? {
return Ok(Some(record));
}
if now_fn() >= deadline {
return Ok(None);
}
() = std::future::poll_fn(|task_cx| {
if crate::cx::Cx::with_current(|c| c.checkpoint().is_err()).unwrap_or(false) {
return Poll::Ready(());
}
if Pin::new(&mut sleep).poll(task_cx).is_ready() {
return Poll::Ready(());
}
if Pin::new(&mut state_wait).poll(task_cx).is_ready() {
return Poll::Ready(());
}
if Pin::new(&mut broker_wait).poll(task_cx).is_ready() {
return Poll::Ready(());
}
Poll::Pending
})
.await;
}
}
#[cfg(all(not(test), not(feature = "kafka")))]
{
Err(KafkaError::FeatureDisabled)
}
}
#[allow(unused_variables)]
pub async fn commit_offsets(
&self,
cx: &Cx,
offsets: &[TopicPartitionOffset],
) -> Result<(), KafkaError> {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
self.ensure_open()?;
if broker_operations_feature_disabled_for_build() {
return Err(KafkaError::FeatureDisabled);
}
if offsets.is_empty() {
return Err(KafkaError::Config("offsets cannot be empty".to_string()));
}
let mut normalized = BTreeMap::new();
{
let state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
for tpo in offsets {
validate_partition_number(tpo.partition)?;
if !state.subscribed_topics.contains(&tpo.topic) {
return Err(KafkaError::InvalidTopic(tpo.topic.clone()));
}
let key = (tpo.topic.clone(), tpo.partition);
if !state.assigned_partitions.contains(&key) {
return Err(KafkaError::Config(
"partition is not assigned to this consumer".to_string(),
));
}
if tpo.offset < 0 {
return Err(KafkaError::Config(
"offsets must be non-negative".to_string(),
));
}
if let Some(previous) = state.committed_offsets.get(&key)
&& tpo.offset < *previous
{
return Err(KafkaError::Config(
"offset commit regression is not allowed".to_string(),
));
}
if normalized.insert(key, tpo.offset).is_some() {
return Err(KafkaError::Config(
"duplicate topic/partition entry in commit batch".to_string(),
));
}
}
drop(state);
}
#[cfg(feature = "kafka")]
if let Some((consumer, broker_ops)) = self.broker_backend() {
let commit_batch = normalized.clone();
let config = &self.config;
retry_consumer_operation(cx, config, || {
let commit_batch = &commit_batch;
let consumer = &consumer;
let broker_ops = &broker_ops;
std::thread::scope(|scope| {
scope
.spawn(|| {
let _guard = broker_ops.lock();
let mut tpl = TopicPartitionList::new();
for ((topic, partition), offset) in commit_batch {
tpl.add_partition_offset(
topic,
*partition,
Offset::Offset(*offset),
)
.map_err(map_consumer_error)?;
}
consumer
.commit(&tpl, CommitMode::Sync)
.map_err(map_consumer_error)
})
.join()
.unwrap()
})
})
.await?;
}
let mut state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
for (key, offset) in normalized {
state.committed_offsets.insert(key, offset);
}
drop(state);
self.state_notify.notify_waiters();
Ok(())
}
#[allow(unused_variables)]
pub async fn seek(&self, cx: &Cx, tpo: &TopicPartitionOffset) -> Result<(), KafkaError> {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
self.ensure_open()?;
if broker_operations_feature_disabled_for_build() {
return Err(KafkaError::FeatureDisabled);
}
validate_partition_number(tpo.partition)?;
if tpo.offset < 0 {
return Err(KafkaError::Config(
"seek offset must be non-negative".to_string(),
));
}
{
let state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
if !state.subscribed_topics.contains(&tpo.topic) {
return Err(KafkaError::InvalidTopic(tpo.topic.clone()));
}
if !state
.assigned_partitions
.contains(&(tpo.topic.clone(), tpo.partition))
{
return Err(KafkaError::Config(
"partition is not assigned to this consumer".to_string(),
));
}
}
#[cfg(feature = "kafka")]
if let Some((consumer, broker_ops)) = self.broker_backend() {
let topic = tpo.topic.clone();
let partition = tpo.partition;
let offset = tpo.offset;
crate::runtime::spawn_blocking::spawn_blocking_on_thread(move || {
let _guard = broker_ops.lock();
consumer
.seek(
&topic,
partition,
Offset::Offset(offset),
Duration::from_secs(1),
)
.map_err(map_consumer_error)
})
.await?;
}
let mut state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
state
.positions
.insert((tpo.topic.clone(), tpo.partition), tpo.offset);
drop(state);
self.state_notify.notify_waiters();
Ok(())
}
#[allow(unused_variables)]
pub async fn close(&self, cx: &Cx) -> Result<(), KafkaError> {
cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
let was_closed = self.closed.swap(true, Ordering::AcqRel);
if !was_closed {
#[cfg(feature = "kafka")]
if let Some((consumer, broker_ops)) = self.broker_backend() {
crate::runtime::spawn_blocking::spawn_blocking_on_thread(move || {
let _guard = broker_ops.lock();
consumer.unsubscribe();
consumer.unassign().map_err(map_consumer_error)
})
.await?;
}
let mut state = self.state.lock();
state.subscribed_topics.clear();
state.assigned_partitions.clear();
state.committed_offsets.clear();
state.positions.clear();
state.last_revoked_partitions.clear();
drop(state);
self.state_notify.notify_waiters();
}
Ok(())
}
#[must_use]
pub const fn config(&self) -> &ConsumerConfig {
&self.config
}
#[must_use]
pub fn subscriptions(&self) -> Vec<String> {
self.state
.lock()
.subscribed_topics
.iter()
.cloned()
.collect()
}
#[must_use]
pub fn assigned_partitions(&self) -> Vec<(String, i32)> {
self.state
.lock()
.assigned_partitions
.iter()
.cloned()
.collect()
}
#[must_use]
pub fn rebalance_generation(&self) -> u64 {
self.state.lock().rebalance_generation
}
#[must_use]
pub fn last_revoked_partitions(&self) -> Vec<(String, i32)> {
self.state
.lock()
.last_revoked_partitions
.iter()
.cloned()
.collect()
}
#[must_use]
pub fn committed_offset(&self, topic: &str, partition: i32) -> Option<i64> {
self.state
.lock()
.committed_offsets
.get(&(topic.to_string(), partition))
.copied()
}
#[must_use]
pub fn position(&self, topic: &str, partition: i32) -> Option<i64> {
self.state
.lock()
.positions
.get(&(topic.to_string(), partition))
.copied()
}
#[must_use]
pub fn is_closed(&self) -> bool {
self.closed.load(Ordering::Acquire)
}
fn ensure_open(&self) -> Result<(), KafkaError> {
if self.closed.load(Ordering::Acquire) {
Err(KafkaError::Config("consumer is closed".to_string()))
} else {
Ok(())
}
}
fn ensure_has_subscription(&self) -> Result<(), KafkaError> {
let state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
if state.subscribed_topics.is_empty() {
return Err(KafkaError::Config(
"consumer has no active topic subscription".to_string(),
));
}
drop(state);
Ok(())
}
#[cfg(all(test, not(feature = "kafka")))]
fn try_poll_local_record(&self) -> Result<Option<ConsumerRecord>, KafkaError> {
let mut state = self.state.lock();
if self.closed.load(Ordering::Acquire) {
return Err(KafkaError::Config("consumer is closed".to_string()));
}
if state.subscribed_topics.is_empty() {
return Err(KafkaError::Config(
"consumer has no active topic subscription".to_string(),
));
}
let assignments: Vec<(String, i32)> = state.assigned_partitions.iter().cloned().collect();
if assignments.is_empty() {
drop(state);
return Ok(None);
}
let start = state.poll_cursor % assignments.len();
for step in 0..assignments.len() {
let index = (start + step) % assignments.len();
let (topic, partition) = &assignments[index];
let offset =
Self::current_position_for_partition(&self.config, &mut state, topic, *partition)?;
if let Some(record) = deterministic_broker_fetch(topic, *partition, offset) {
state
.positions
.insert((topic.clone(), *partition), offset.saturating_add(1));
state.poll_cursor = (index + 1) % assignments.len();
drop(state);
return Ok(Some(ConsumerRecord {
topic: record.topic,
partition: record.partition,
offset,
key: record.key,
payload: record.payload,
timestamp: record.timestamp,
headers: record.headers,
}));
}
}
drop(state);
Ok(None)
}
#[cfg(all(test, not(feature = "kafka")))]
fn current_position_for_partition(
config: &ConsumerConfig,
state: &mut ConsumerState,
topic: &str,
partition: i32,
) -> Result<i64, KafkaError> {
let key = (topic.to_string(), partition);
if let Some(position) = state.positions.get(&key) {
return Ok(*position);
}
if let Some(committed) = state.committed_offsets.get(&key) {
state.positions.insert(key, *committed);
return Ok(*committed);
}
let initial_offset = match config.auto_offset_reset {
AutoOffsetReset::Earliest => 0,
AutoOffsetReset::Latest => deterministic_broker_end_offset(topic, partition),
AutoOffsetReset::None => {
return Err(KafkaError::Config(format!(
"no offset available for {topic}[{partition}] and auto_offset_reset is None"
)));
}
};
state.positions.insert(key, initial_offset);
Ok(initial_offset)
}
}
fn validate_partition_number(partition: i32) -> Result<(), KafkaError> {
if partition < 0 {
Err(KafkaError::Config(
"partition must be non-negative".to_string(),
))
} else {
Ok(())
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
#[cfg(not(feature = "kafka"))]
use crate::messaging::kafka::{
DeterministicBrokerTestGuard, KafkaProducer, ProducerConfig,
lock_deterministic_broker_for_tests,
};
use crate::test_utils::run_test_with_cx;
#[cfg(feature = "kafka")]
use rdkafka::topic_partition_list::Offset;
use std::sync::Arc;
#[cfg(not(feature = "kafka"))]
use std::time::Instant;
#[cfg(not(feature = "kafka"))]
fn deterministic_broker_guard() -> DeterministicBrokerTestGuard {
lock_deterministic_broker_for_tests()
}
#[cfg(feature = "kafka")]
#[test]
fn broker_snapshot_update_tracks_generation_and_revocations() {
let mut state = ConsumerState::default();
apply_broker_snapshot(
&mut state,
broker_snapshot_from_topic_maps(
BTreeSet::from([("orders".to_string(), 0), ("orders".to_string(), 1)]),
BTreeMap::from([
(("orders".to_string(), 0), 4),
(("orders".to_string(), 1), 8),
]),
),
);
assert_eq!(state.rebalance_generation, 1);
assert_eq!(state.last_revoked_partitions.len(), 0);
assert_eq!(state.positions.get(&("orders".to_string(), 1)), Some(&8));
apply_broker_snapshot(
&mut state,
broker_snapshot_from_topic_maps(
BTreeSet::from([("orders".to_string(), 1)]),
BTreeMap::from([(("orders".to_string(), 1), 9)]),
),
);
assert_eq!(state.rebalance_generation, 2);
assert_eq!(
state.last_revoked_partitions,
BTreeSet::from([("orders".to_string(), 0)])
);
assert_eq!(state.positions.get(&("orders".to_string(), 1)), Some(&9));
assert!(!state.positions.contains_key(&("orders".to_string(), 0)));
}
#[cfg(feature = "kafka")]
#[test]
fn offset_from_rdkafka_only_keeps_absolute_offsets() {
assert_eq!(offset_from_rdkafka(Offset::Offset(7)), Some(7));
assert_eq!(offset_from_rdkafka(Offset::Offset(-1)), None);
assert_eq!(offset_from_rdkafka(Offset::Beginning), None);
assert_eq!(offset_from_rdkafka(Offset::End), None);
assert_eq!(offset_from_rdkafka(Offset::Stored), None);
assert_eq!(offset_from_rdkafka(Offset::Invalid), None);
}
#[test]
fn test_config_defaults() {
let config = ConsumerConfig::default();
assert_eq!(config.group_id, "asupersync-default");
assert_eq!(config.max_poll_records, 500);
assert!(
!config.enable_auto_commit,
"default must be manual-commit / at-least-once"
);
}
#[test]
fn test_config_builder() {
let config = ConsumerConfig::new(vec!["kafka:9092".to_string()], "group-1")
.client_id("consumer-1")
.auto_offset_reset(AutoOffsetReset::Earliest)
.enable_auto_commit(false)
.max_poll_records(1000)
.fetch_min_bytes(4)
.fetch_max_bytes(1024)
.isolation_level(IsolationLevel::ReadCommitted);
assert_eq!(config.bootstrap_servers, vec!["kafka:9092"]);
assert_eq!(config.group_id, "group-1");
assert_eq!(config.client_id, Some("consumer-1".to_string()));
assert_eq!(config.auto_offset_reset, AutoOffsetReset::Earliest);
assert!(!config.enable_auto_commit);
assert_eq!(config.max_poll_records, 1000);
assert_eq!(config.fetch_min_bytes, 4);
assert_eq!(config.fetch_max_bytes, 1024);
assert_eq!(config.isolation_level, IsolationLevel::ReadCommitted);
assert!(!config.allow_insecure_transport_for_testing);
assert_eq!(config.security, KafkaSecurityConfig::Plaintext);
}
#[test]
fn test_config_validation() {
let empty_servers = ConsumerConfig {
bootstrap_servers: vec![],
..Default::default()
};
assert!(empty_servers.validate().is_err());
let empty_group = ConsumerConfig::new(vec!["kafka:9092".to_string()], "");
assert!(empty_group.validate().is_err());
let bad_fetch = ConsumerConfig::new(vec!["kafka:9092".to_string()], "group")
.fetch_min_bytes(10)
.fetch_max_bytes(1);
assert!(bad_fetch.validate().is_err());
let remote_plaintext =
ConsumerConfig::new(vec!["broker.example.com:9092".to_string()], "group");
let err = remote_plaintext.validate().expect_err(
"remote non-loopback bootstrap servers must fail closed until TLS/SASL support lands",
);
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("TLS or SASL_SSL")));
let explicit_insecure =
ConsumerConfig::new(vec!["broker.example.com:9092".to_string()], "group")
.allow_insecure_transport_for_testing(true);
assert!(explicit_insecure.validate().is_ok());
let tls = ConsumerConfig::new(vec!["broker.example.com:9092".to_string()], "group")
.tls(KafkaTlsConfig::new().ca_location("/etc/ssl/certs"));
assert!(tls.validate().is_ok());
let sasl = ConsumerConfig::new(vec!["broker.example.com:9092".to_string()], "group")
.sasl_scram_sha_256("service-user", "top-secret");
assert!(sasl.validate().is_ok());
let bad_sasl = ConsumerConfig::new(vec!["broker.example.com:9092".to_string()], "group")
.sasl_scram_sha_512("service-user", "");
let err = bad_sasl
.validate()
.expect_err("blank SASL password must fail closed");
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("password")));
let debug = format!("{sasl:?}");
assert!(debug.contains("<redacted>"));
assert!(!debug.contains("top-secret"));
}
#[test]
fn test_topic_partition_offset() {
let tpo = TopicPartitionOffset::new("topic", 1, 42);
assert_eq!(tpo.topic, "topic");
assert_eq!(tpo.partition, 1);
assert_eq!(tpo.offset, 42);
}
#[test]
fn test_consumer_creation() {
let config = ConsumerConfig::default();
let consumer = KafkaConsumer::new(config);
assert!(consumer.is_ok());
}
#[test]
fn auto_offset_reset_default() {
let d = AutoOffsetReset::default();
assert_eq!(d, AutoOffsetReset::Latest);
}
#[test]
fn auto_offset_reset_debug_copy_eq() {
let e = AutoOffsetReset::Earliest;
let dbg = format!("{e:?}");
assert!(dbg.contains("Earliest"));
let e2 = e;
assert_eq!(e, e2);
let e3 = e;
assert_eq!(e, e3);
assert_ne!(AutoOffsetReset::Earliest, AutoOffsetReset::Latest);
assert_ne!(AutoOffsetReset::Latest, AutoOffsetReset::None);
}
#[test]
fn isolation_level_default() {
let d = IsolationLevel::default();
assert_eq!(d, IsolationLevel::ReadUncommitted);
}
#[test]
fn isolation_level_debug_copy_eq() {
let rc = IsolationLevel::ReadCommitted;
let dbg = format!("{rc:?}");
assert!(dbg.contains("ReadCommitted"));
let rc2 = rc;
assert_eq!(rc, rc2);
assert_ne!(
IsolationLevel::ReadCommitted,
IsolationLevel::ReadUncommitted
);
}
#[test]
fn consumer_config_debug_clone() {
let cfg = ConsumerConfig::default();
let dbg = format!("{cfg:?}");
assert!(dbg.contains("asupersync-default"));
let cloned = cfg;
assert_eq!(cloned.group_id, "asupersync-default");
}
#[test]
fn consumer_config_new_overrides_defaults() {
let cfg = ConsumerConfig::new(vec!["broker:9092".into()], "my-group");
assert_eq!(cfg.bootstrap_servers, vec!["broker:9092"]);
assert_eq!(cfg.group_id, "my-group");
assert_eq!(cfg.max_poll_records, 500);
assert!(!cfg.enable_auto_commit);
}
#[test]
fn consumer_config_session_timeout_builder() {
let cfg = ConsumerConfig::default().session_timeout(Duration::from_secs(60));
assert_eq!(cfg.session_timeout, Duration::from_secs(60));
}
#[test]
fn consumer_config_heartbeat_builder() {
let cfg = ConsumerConfig::default().heartbeat_interval(Duration::from_secs(10));
assert_eq!(cfg.heartbeat_interval, Duration::from_secs(10));
}
#[test]
fn consumer_config_auto_commit_interval_builder() {
let cfg = ConsumerConfig::default().auto_commit_interval(Duration::from_secs(15));
assert_eq!(cfg.auto_commit_interval, Duration::from_secs(15));
}
#[test]
fn consumer_config_fetch_max_wait_builder() {
let cfg = ConsumerConfig::default().fetch_max_wait(Duration::from_secs(1));
assert_eq!(cfg.fetch_max_wait, Duration::from_secs(1));
}
#[test]
fn consumer_config_validate_zero_poll_records() {
let cfg = ConsumerConfig::default().max_poll_records(0);
assert!(cfg.validate().is_err());
}
#[test]
fn consumer_config_validate_whitespace_group() {
let cfg = ConsumerConfig::new(vec!["kafka:9092".into()], " ");
assert!(cfg.validate().is_err());
}
#[test]
fn consumer_config_validate_ok() {
let cfg = ConsumerConfig::default();
assert!(cfg.validate().is_ok());
}
#[test]
fn consumer_config_allows_loopback_ipv4_and_ipv6_without_insecure_opt_in() {
let ipv4 = ConsumerConfig::new(vec!["127.0.0.1:9092".into()], "group");
assert!(ipv4.validate().is_ok());
let ipv6 = ConsumerConfig::new(vec!["[::1]:9092".into()], "group");
assert!(ipv6.validate().is_ok());
}
#[test]
fn topic_partition_offset_debug_clone_eq() {
let tpo = TopicPartitionOffset::new("events", 0, 100);
let dbg = format!("{tpo:?}");
assert!(dbg.contains("events"));
assert!(dbg.contains("100"));
let cloned = tpo.clone();
assert_eq!(tpo, cloned);
}
#[test]
fn topic_partition_offset_inequality() {
let a = TopicPartitionOffset::new("t1", 0, 0);
let b = TopicPartitionOffset::new("t2", 0, 0);
assert_ne!(a, b);
}
#[test]
fn consumer_record_debug_clone() {
let rec = ConsumerRecord {
topic: "test-topic".into(),
partition: 3,
offset: 42,
key: Some(b"key".to_vec()),
payload: b"value".to_vec(),
timestamp: Some(1000),
headers: vec![("h1".into(), b"v1".to_vec())],
};
let dbg = format!("{rec:?}");
assert!(dbg.contains("test-topic"));
assert!(dbg.contains("42"));
let cloned = rec;
assert_eq!(cloned.topic, "test-topic");
assert_eq!(cloned.partition, 3);
assert_eq!(cloned.key, Some(b"key".to_vec()));
}
#[test]
fn consumer_record_no_key_no_timestamp() {
let rec = ConsumerRecord {
topic: "t".into(),
partition: 0,
offset: 0,
key: None,
payload: vec![],
timestamp: None,
headers: vec![],
};
assert!(rec.key.is_none());
assert!(rec.timestamp.is_none());
}
#[test]
fn kafka_consumer_debug_config_accessor() {
let cfg = ConsumerConfig::default();
let consumer = KafkaConsumer::new(cfg).unwrap();
let dbg = format!("{consumer:?}");
assert!(dbg.contains("KafkaConsumer"));
assert_eq!(consumer.config().group_id, "asupersync-default");
}
#[test]
fn kafka_consumer_rejects_invalid_config() {
let cfg = ConsumerConfig {
bootstrap_servers: vec![],
..Default::default()
};
assert!(KafkaConsumer::new(cfg).is_err());
}
#[test]
fn auto_offset_reset_debug_clone_copy_eq_default() {
let a = AutoOffsetReset::default();
assert_eq!(a, AutoOffsetReset::Latest);
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(a, AutoOffsetReset::Earliest);
assert_ne!(a, AutoOffsetReset::None);
let dbg = format!("{a:?}");
assert!(dbg.contains("Latest"));
}
#[test]
fn isolation_level_debug_clone_copy_eq_default() {
let a = IsolationLevel::default();
assert_eq!(a, IsolationLevel::ReadUncommitted);
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(a, IsolationLevel::ReadCommitted);
let dbg = format!("{a:?}");
assert!(dbg.contains("ReadUncommitted"));
}
#[test]
fn consumer_config_debug_clone_default() {
let cfg = ConsumerConfig::default();
let cloned = cfg.clone();
assert_eq!(cloned.group_id, "asupersync-default");
assert_eq!(cloned.auto_offset_reset, AutoOffsetReset::Latest);
assert_eq!(cloned.isolation_level, IsolationLevel::ReadUncommitted);
let dbg = format!("{cfg:?}");
assert!(dbg.contains("ConsumerConfig"));
}
#[test]
fn consumer_subscribe_tracks_assignments() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer
.subscribe(&cx, &["orders", "orders", "payments"])
.await
.unwrap();
assert_eq!(
consumer.subscriptions(),
vec!["orders".to_string(), "payments".to_string()]
);
assert_eq!(
consumer.assigned_partitions(),
vec![("orders".to_string(), 0), ("payments".to_string(), 0)]
);
assert!(
consumer
.poll(&cx, Duration::from_millis(1))
.await
.unwrap()
.is_none()
);
});
}
#[test]
fn consumer_commit_and_seek_track_offsets() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer.subscribe(&cx, &["orders"]).await.unwrap();
consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("orders", 0, 7)])
.await
.unwrap();
assert_eq!(consumer.committed_offset("orders", 0), Some(7));
consumer
.seek(&cx, &TopicPartitionOffset::new("orders", 0, 42))
.await
.unwrap();
assert_eq!(consumer.position("orders", 0), Some(42));
let missing = consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("missing", 0, 1)])
.await
.unwrap_err();
assert!(matches!(missing, KafkaError::InvalidTopic(topic) if topic == "missing"));
let negative = consumer
.seek(&cx, &TopicPartitionOffset::new("orders", 0, -1))
.await
.unwrap_err();
assert!(matches!(negative, KafkaError::Config(msg) if msg.contains("non-negative")));
});
}
#[test]
fn consumer_close_is_idempotent_and_blocks_operations() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer.subscribe(&cx, &["orders"]).await.unwrap();
consumer.close(&cx).await.unwrap();
consumer.close(&cx).await.unwrap();
assert!(consumer.is_closed());
let err = consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("orders", 0, 1)])
.await
.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("closed")));
let seek_err = consumer
.seek(&cx, &TopicPartitionOffset::new("orders", 0, 42))
.await
.unwrap_err();
assert!(matches!(seek_err, KafkaError::Config(msg) if msg.contains("closed")));
});
}
#[test]
fn consumer_rejects_empty_topic_entries() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
let err = consumer.subscribe(&cx, &["orders", ""]).await.unwrap_err();
assert!(
matches!(err, KafkaError::Config(msg) if msg.contains("topic cannot be empty"))
);
});
}
#[test]
fn consumer_rebalance_tracks_assignment_and_revocation() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer
.subscribe(&cx, &["orders", "payments"])
.await
.unwrap();
let result = consumer
.rebalance(
&cx,
&[
TopicPartitionOffset::new("orders", 1, 10),
TopicPartitionOffset::new("orders", 2, 0),
],
)
.await
.unwrap();
assert_eq!(result.generation, 1);
assert_eq!(
result.assigned,
vec![("orders".to_string(), 1), ("orders".to_string(), 2)]
);
assert_eq!(
result.revoked,
vec![("orders".to_string(), 0), ("payments".to_string(), 0)]
);
assert_eq!(consumer.position("orders", 1), Some(10));
assert_eq!(consumer.position("orders", 2), Some(0));
assert_eq!(consumer.rebalance_generation(), 1);
assert_eq!(
consumer.last_revoked_partitions(),
vec![("orders".to_string(), 0), ("payments".to_string(), 0)]
);
});
}
#[test]
fn consumer_rebalance_rejects_duplicate_partition_entries() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer
.subscribe(&cx, &["orders", "payments"])
.await
.unwrap();
let err = consumer
.rebalance(
&cx,
&[
TopicPartitionOffset::new("orders", 1, 10),
TopicPartitionOffset::new("orders", 1, 25),
],
)
.await
.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("duplicate")));
assert_eq!(
consumer.assigned_partitions(),
vec![("orders".to_string(), 0), ("payments".to_string(), 0)]
);
assert_eq!(consumer.rebalance_generation(), 0);
assert!(consumer.last_revoked_partitions().is_empty());
assert_eq!(consumer.position("orders", 1), None);
});
}
#[test]
fn consumer_rebalance_rejects_close_race_after_open() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = Arc::new(KafkaConsumer::new(ConsumerConfig::default()).unwrap());
consumer.subscribe(&cx, &["orders"]).await.unwrap();
let hook = Arc::new(RebalanceAfterOpenHook::new());
consumer.install_rebalance_after_open_hook(Arc::clone(&hook));
let rebalance_consumer = Arc::clone(&consumer);
let rebalance_cx = cx.clone();
let handle = std::thread::spawn(move || {
futures_lite::future::block_on(
rebalance_consumer
.rebalance(&rebalance_cx, &[TopicPartitionOffset::new("orders", 1, 10)]),
)
});
hook.arrived.wait();
consumer.closed.store(true, Ordering::Release);
hook.release.wait();
let err = handle
.join()
.expect("rebalance thread panicked")
.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("closed")));
assert_eq!(consumer.rebalance_generation(), 0);
assert_eq!(
consumer.assigned_partitions(),
vec![("orders".to_string(), 0)]
);
assert_eq!(consumer.position("orders", 1), None);
});
}
#[test]
fn consumer_rebalance_rejects_negative_partition_numbers() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer
.subscribe(&cx, &["orders", "payments"])
.await
.unwrap();
let err = consumer
.rebalance(&cx, &[TopicPartitionOffset::new("orders", -1, 10)])
.await
.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("non-negative")));
assert_eq!(
consumer.assigned_partitions(),
vec![("orders".to_string(), 0), ("payments".to_string(), 0)]
);
assert_eq!(consumer.rebalance_generation(), 0);
assert!(consumer.last_revoked_partitions().is_empty());
assert_eq!(consumer.position("orders", -1), None);
});
}
#[test]
fn consumer_commit_rejects_unassigned_partitions_and_regression() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer.subscribe(&cx, &["orders"]).await.unwrap();
let unassigned = consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("orders", 1, 5)])
.await
.unwrap_err();
assert!(matches!(unassigned, KafkaError::Config(msg) if msg.contains("not assigned")));
consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("orders", 0, 8)])
.await
.unwrap();
let regression = consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("orders", 0, 7)])
.await
.unwrap_err();
assert!(matches!(regression, KafkaError::Config(msg) if msg.contains("regression")));
});
}
#[test]
fn consumer_commit_rejects_duplicate_partition_entries_in_single_batch() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer.subscribe(&cx, &["orders"]).await.unwrap();
let err = consumer
.commit_offsets(
&cx,
&[
TopicPartitionOffset::new("orders", 0, 8),
TopicPartitionOffset::new("orders", 0, 7),
],
)
.await
.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("duplicate")));
assert_eq!(consumer.committed_offset("orders", 0), None);
});
}
#[cfg(not(feature = "kafka"))]
#[test]
fn consumer_resubscribe_preserves_committed_offsets_across_topic_changes() {
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let topic = "consumer-resubscribe-preserves-committed-offsets";
let other_topic = "consumer-resubscribe-preserves-committed-offsets-other";
let consumer = KafkaConsumer::new(
ConsumerConfig::new(vec!["localhost:9092".to_string()], "group-resubscribe")
.auto_offset_reset(AutoOffsetReset::None),
)
.unwrap();
consumer.subscribe(&cx, &[topic]).await.unwrap();
consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new(topic, 0, 7)])
.await
.unwrap();
consumer.subscribe(&cx, &[other_topic]).await.unwrap();
assert_eq!(consumer.position(topic, 0), None);
assert_eq!(consumer.committed_offset(topic, 0), Some(7));
consumer.subscribe(&cx, &[topic]).await.unwrap();
assert_eq!(consumer.committed_offset(topic, 0), Some(7));
assert!(
consumer.poll(&cx, Duration::ZERO).await.unwrap().is_none(),
"existing committed offset should satisfy auto_offset_reset=None after resubscribe"
);
assert_eq!(consumer.position(topic, 0), Some(7));
});
}
#[test]
fn consumer_commit_and_seek_reject_negative_partition_numbers() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer.subscribe(&cx, &["orders"]).await.unwrap();
let commit_err = consumer
.commit_offsets(&cx, &[TopicPartitionOffset::new("orders", -1, 8)])
.await
.unwrap_err();
assert!(matches!(commit_err, KafkaError::Config(msg) if msg.contains("non-negative")));
assert_eq!(consumer.committed_offset("orders", -1), None);
let seek_err = consumer
.seek(&cx, &TopicPartitionOffset::new("orders", -1, 42))
.await
.unwrap_err();
assert!(matches!(seek_err, KafkaError::Config(msg) if msg.contains("non-negative")));
assert_eq!(consumer.position("orders", -1), None);
});
}
#[test]
fn consumer_seek_rejects_unassigned_partitions() {
#[cfg(not(feature = "kafka"))]
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let consumer = KafkaConsumer::new(ConsumerConfig::default()).unwrap();
consumer.subscribe(&cx, &["orders"]).await.unwrap();
let err = consumer
.seek(&cx, &TopicPartitionOffset::new("orders", 1, 1))
.await
.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("not assigned")));
});
}
#[cfg(not(feature = "kafka"))]
#[test]
fn consumer_poll_returns_brokerless_records_and_advances_position() {
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let topic = "consumer-poll-returns-brokerless-records";
let producer = KafkaProducer::new(ProducerConfig::default()).unwrap();
let consumer = KafkaConsumer::new(
ConsumerConfig::new(vec!["localhost:9092".to_string()], "group-a")
.auto_offset_reset(AutoOffsetReset::Earliest),
)
.unwrap();
producer
.send(&cx, topic, Some(b"k1"), b"one", Some(0))
.await
.unwrap();
producer
.send(&cx, topic, Some(b"k2"), b"two", Some(0))
.await
.unwrap();
consumer.subscribe(&cx, &[topic]).await.unwrap();
let first = consumer
.poll(&cx, Duration::ZERO)
.await
.unwrap()
.expect("first record");
assert_eq!(first.topic, topic);
assert_eq!(first.partition, 0);
assert_eq!(first.offset, 0);
assert_eq!(first.key.as_deref(), Some(&b"k1"[..]));
assert_eq!(first.payload, b"one");
let second = consumer
.poll(&cx, Duration::ZERO)
.await
.unwrap()
.expect("second record");
assert_eq!(second.offset, 1);
assert_eq!(second.key.as_deref(), Some(&b"k2"[..]));
assert_eq!(second.payload, b"two");
assert_eq!(consumer.position(topic, 0), Some(2));
});
}
#[cfg(not(feature = "kafka"))]
#[test]
fn consumer_latest_offset_reset_skips_existing_backlog() {
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let topic = "consumer-latest-offset-reset-skips-existing-backlog";
let producer = KafkaProducer::new(ProducerConfig::default()).unwrap();
let consumer =
KafkaConsumer::new(ConsumerConfig::new(vec!["localhost:9092".to_string()], "g"))
.unwrap();
producer
.send(&cx, topic, None, b"existing-before-subscribe", Some(0))
.await
.unwrap();
consumer.subscribe(&cx, &[topic]).await.unwrap();
assert!(consumer.poll(&cx, Duration::ZERO).await.unwrap().is_none());
producer
.send(&cx, topic, None, b"after-subscribe", Some(0))
.await
.unwrap();
let record = consumer
.poll(&cx, Duration::ZERO)
.await
.unwrap()
.expect("post-subscribe record");
assert_eq!(record.offset, 1);
assert_eq!(record.payload, b"after-subscribe");
});
}
#[cfg(not(feature = "kafka"))]
#[test]
fn consumer_offset_reset_none_requires_existing_position() {
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let topic = "consumer-offset-reset-none-requires-existing-position";
let consumer = KafkaConsumer::new(
ConsumerConfig::new(vec!["localhost:9092".to_string()], "g")
.auto_offset_reset(AutoOffsetReset::None),
)
.unwrap();
consumer.subscribe(&cx, &[topic]).await.unwrap();
let err = consumer.poll(&cx, Duration::ZERO).await.unwrap_err();
assert!(matches!(err, KafkaError::Config(msg) if msg.contains("no offset available")));
});
}
#[cfg(not(feature = "kafka"))]
#[test]
fn consumer_poll_rechecks_brokerless_records_after_waiter_registration() {
let _broker = deterministic_broker_guard();
run_test_with_cx(|cx| async move {
let topic = "consumer-poll-rechecks-brokerless-records-after-waiter-registration";
let producer = KafkaProducer::new(ProducerConfig::default()).unwrap();
let consumer = Arc::new(
KafkaConsumer::new(
ConsumerConfig::new(vec!["localhost:9092".to_string()], "group-recheck")
.auto_offset_reset(AutoOffsetReset::Earliest),
)
.unwrap(),
);
consumer.subscribe(&cx, &[topic]).await.unwrap();
let hook = Arc::new(PollBeforeWaitHook::new());
consumer.install_poll_before_wait_hook(Arc::clone(&hook));
let poll_consumer = Arc::clone(&consumer);
let poll_cx = cx.clone();
let started = Instant::now();
let handle = std::thread::spawn(move || {
futures_lite::future::block_on(poll_consumer.poll(&poll_cx, Duration::from_secs(1)))
});
hook.arrived.wait();
producer
.send(&cx, topic, Some(b"k"), b"wake", Some(0))
.await
.unwrap();
hook.release.wait();
let record = handle
.join()
.expect("poll thread panicked")
.unwrap()
.expect("poll should return the brokerless record without sleeping until timeout");
assert_eq!(record.topic, topic);
assert_eq!(record.partition, 0);
assert_eq!(record.offset, 0);
assert_eq!(record.key.as_deref(), Some(&b"k"[..]));
assert_eq!(record.payload, b"wake");
assert!(
started.elapsed() < Duration::from_millis(400),
"poll should recheck immediately after waiter registration instead of idling until timeout"
);
});
}
#[cfg(feature = "kafka")]
#[test]
fn rebalance_toctou_drops_record_for_revoked_partition() {
let mut state = ConsumerState::default();
apply_broker_snapshot(
&mut state,
broker_snapshot_from_topic_maps(
BTreeSet::from([("topic_a".to_string(), 0), ("topic_a".to_string(), 1)]),
BTreeMap::new(),
),
);
assert!(
state
.assigned_partitions
.contains(&("topic_a".to_string(), 1))
);
let pre_gen = state.rebalance_generation;
apply_broker_snapshot(
&mut state,
broker_snapshot_from_topic_maps(
BTreeSet::from([("topic_a".to_string(), 0)]), BTreeMap::new(),
),
);
assert!(state.rebalance_generation > pre_gen, "generation must bump");
assert!(
state
.last_revoked_partitions
.contains(&("topic_a".to_string(), 1))
);
let record_topic = "topic_a".to_string();
let record_partition: i32 = 1;
let owned = state
.assigned_partitions
.contains(&(record_topic.clone(), record_partition));
assert!(
!owned,
"post-rebalance check MUST report (topic_a, 1) as NOT owned — \
record must be dropped to avoid delivering for revoked partition"
);
let owned_unrevoked = state
.assigned_partitions
.contains(&("topic_a".to_string(), 0));
assert!(
owned_unrevoked,
"(topic_a, 0) is still in the assignment — its record MUST be delivered"
);
}
#[cfg(feature = "kafka")]
#[test]
fn rebalance_toctou_keeps_record_when_no_revocation() {
let mut state = ConsumerState::default();
apply_broker_snapshot(
&mut state,
broker_snapshot_from_topic_maps(
BTreeSet::from([("topic_a".to_string(), 0)]),
BTreeMap::new(),
),
);
apply_broker_snapshot(
&mut state,
broker_snapshot_from_topic_maps(
BTreeSet::from([("topic_a".to_string(), 0)]),
BTreeMap::new(),
),
);
let owned = state
.assigned_partitions
.contains(&("topic_a".to_string(), 0));
assert!(owned, "no rebalance → (topic_a, 0) still owned");
}
#[test]
fn timer_driver_fallback_returns_nonzero_when_no_driver_attached() {
let cx = Cx::for_request();
let _ = crate::time::wall_now();
std::thread::sleep(std::time::Duration::from_millis(2));
let now_fn = || {
cx.timer_driver()
.map_or_else(crate::time::wall_now, |d| d.now())
};
let t = now_fn().as_nanos();
assert!(
t > 0,
"br-asupersync-mskwk7/6mlvbi: kafka_consumer's now_fn must \
return a non-zero value when no TimerDriverHandle is \
attached — the wall_now fallback must be reachable from \
both poll paths"
);
}
#[test]
fn timer_driver_used_when_attached() {
use crate::time::{TimerDriverHandle, VirtualClock};
let clock = std::sync::Arc::new(VirtualClock::new());
let driver = TimerDriverHandle::with_virtual_clock(clock.clone());
let driver_ref = driver.clone();
let virtual_now_via_closure = || {
let timer_opt = Some(driver_ref.clone());
timer_opt.map_or_else(crate::time::wall_now, |d| d.now())
};
clock.advance(7_000_000_000);
let observed = virtual_now_via_closure().as_nanos();
assert!(
observed >= 7_000_000_000,
"br-asupersync-mskwk7/6mlvbi: when TimerDriverHandle is \
attached, the now_fn closure must read from it \
(observed: {observed}ns; expected >= 7s)"
);
}
}