use super::*;
use tokio::sync::oneshot;
impl ConsumerRuntime {
pub(crate) fn register_poll(
&mut self,
timeout: Duration,
cancellation: Option<CancellationToken>,
reply: oneshot::Sender<ClientResult<ConsumerRecords>>,
) -> ClientResult<()> {
self.heartbeat_state.last_application_poll = Instant::now();
if cancellation
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
let _ = reply.send(Err(Error::Cancelled));
return Ok(());
}
if self.poll_state.pending_poll.is_some() {
let _ = reply.send(Err(ConsumerError::ConcurrentPoll.into()));
return Ok(());
}
if self.poll_state.wakeup_pending {
self.poll_state.wakeup_pending = false;
let _ = reply.send(Err(ConsumerError::Wakeup.into()));
return Ok(());
}
self.poll_state.pending_poll = Some(PendingPoll {
deadline: Instant::now() + timeout,
cancellation,
reply,
});
self.maybe_complete_poll();
Ok(())
}
pub(crate) fn wakeup(&mut self) {
if self.poll_state.pending_poll.is_some() {
self.fail_pending_poll(ConsumerError::Wakeup.into());
self.poll_state.wakeup_pending = false;
return;
}
self.poll_state.wakeup_pending = true;
}
pub(crate) fn seek(&mut self, partition: TopicPartitionKey, offset: i64) -> ClientResult<()> {
if offset < 0 {
return Err(ConsumerError::InvalidSeekOffset { offset }.into());
}
let assigned = self
.assignment_state
.assignment
.get_mut(&partition)
.ok_or_else(|| ConsumerError::PartitionNotAssigned {
operation: "seek",
topic: partition.topic.clone(),
partition: partition.partition,
})?;
assigned.fetch_offset = offset;
self.poll_state
.delivered_offsets
.insert(partition.clone(), offset);
self.discard_buffered_partition(&partition);
debug!(
topic = %partition.topic,
partition = partition.partition,
offset,
"updated consumer position via seek"
);
Ok(())
}
pub(crate) async fn seek_to_timestamp(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
timestamp: i64,
) -> ClientResult<()> {
let keys = self.normalize_assigned_partitions(partitions)?;
if keys.is_empty() {
return Ok(());
}
let offsets = self.lookup_offsets_at_timestamp(&keys, timestamp).await?;
for key in keys {
let offset = offsets.get(&key).copied().ok_or_else(|| {
Error::Internal(anyhow!(
"missing offset result for {}:{} during seek",
key.topic,
key.partition
))
})?;
self.seek(key, offset)?;
}
Ok(())
}
pub(crate) async fn seek_partitions_to_timestamp(
&mut self,
partitions: Vec<TopicPartitionTimestamp>,
) -> ClientResult<()> {
let normalized = self.normalize_timestamp_queries(partitions)?;
let offsets = self.lookup_offsets_for_timestamps(&normalized).await?;
for partition in normalized {
let key = TopicPartitionKey::new(partition.topic.clone(), partition.partition);
let resolved = offsets
.get(&key)
.ok_or_else(|| {
Error::Internal(anyhow!(
"missing offset result for {}:{} during seek",
key.topic,
key.partition
))
})?
.offset;
self.seek(key, resolved)?;
}
Ok(())
}
pub(crate) fn position(&self, partition: &TopicPartitionKey) -> ClientResult<i64> {
let assigned = self
.assignment_state
.assignment
.get(partition)
.ok_or_else(|| ConsumerError::PartitionNotAssigned {
operation: "position",
topic: partition.topic.clone(),
partition: partition.partition,
})?;
if let Some(offset) = self
.poll_state
.buffered_records
.iter()
.filter(|record| {
record.topic == partition.topic && record.partition == partition.partition
})
.map(|record| record.offset)
.min()
{
return Ok(offset);
}
Ok(self
.poll_state
.delivered_offsets
.get(partition)
.copied()
.unwrap_or(assigned.fetch_offset))
}
pub(crate) fn pause(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<()> {
for key in self.normalize_assigned_partitions(partitions)? {
self.assignment_state.paused_partitions.insert(key);
}
debug!(
paused = self.assignment_state.paused_partitions.len(),
"paused consumer partitions"
);
Ok(())
}
pub(crate) fn resume(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<()> {
for key in self.normalize_assigned_partitions(partitions)? {
self.assignment_state.paused_partitions.remove(&key);
}
debug!(
paused = self.assignment_state.paused_partitions.len(),
"resumed consumer partitions"
);
Ok(())
}
pub(crate) fn maybe_complete_poll(&mut self) {
let Some(pending) = self.poll_state.pending_poll.take() else {
return;
};
if pending
.cancellation
.as_ref()
.is_some_and(CancellationToken::is_cancelled)
{
let _ = pending.reply.send(Err(Error::Cancelled));
return;
}
if !self.poll_state.buffered_records.is_empty() {
let records = std::mem::take(&mut self.poll_state.buffered_records);
let record_count = records.len();
for record in &records {
self.poll_state.delivered_offsets.insert(
TopicPartitionKey::new(record.topic.clone(), record.partition),
record.offset + 1,
);
}
let _ = pending.reply.send(Ok(ConsumerRecords::new(records)));
debug!(
record_count,
"completed consumer poll with buffered records"
);
return;
}
if Instant::now() >= pending.deadline
|| (!self.has_group_subscription() && self.desired_topics().is_empty())
{
let _ = pending.reply.send(Ok(ConsumerRecords::default()));
trace!("completed consumer poll with no records");
return;
}
self.poll_state.pending_poll = Some(pending);
}
pub(crate) fn fail_pending_poll(&mut self, error: Error) {
if let Some(pending) = self.poll_state.pending_poll.take() {
let _ = pending.reply.send(Err(error));
}
}
}