use std::collections::HashMap;
use std::time::Duration;
use tokio::time::Instant;
use super::runtime::ConsumerRuntime;
use crate::types::AssignedPartition;
pub struct ConsumerRequestManagers;
impl ConsumerRequestManagers {
pub fn poll(runtime: &ConsumerRuntime) -> Vec<ConsumerNetworkAction> {
let mut actions = Vec::new();
if runtime.lifecycle.shutting_down {
if !runtime.lifecycle.pending_commits.is_empty() {
if runtime.connections.coordinator_connection.is_none() {
if runtime.coordinator_retry_ready() {
actions.push(ConsumerNetworkAction::EnsureCoordinator);
}
return actions;
}
actions.push(ConsumerNetworkAction::Commit);
return actions;
}
if runtime.heartbeat_state.member_epoch > 0 {
if runtime.connections.coordinator_connection.is_none() {
if runtime.coordinator_retry_ready() {
actions.push(ConsumerNetworkAction::EnsureCoordinator);
}
return actions;
}
actions.push(ConsumerNetworkAction::LeaveGroup);
}
return actions;
}
let desired_topics = runtime.desired_topics();
let pending_assignment_unresolved = runtime
.assignment_state
.pending_assignment
.as_ref()
.is_some_and(|assignment| !runtime.can_resolve_assignment(assignment));
if (!desired_topics.is_empty()
&& runtime
.connections
.metadata
.needs_any_refresh(desired_topics, runtime.config.metadata_max_age))
|| (runtime.is_pattern_subscription()
&& runtime
.connections
.metadata
.is_stale(runtime.config.metadata_max_age))
|| pending_assignment_unresolved
{
actions.push(ConsumerNetworkAction::RefreshMetadata);
}
if runtime.needs_coordinator() && runtime.connections.coordinator_connection.is_none() {
if runtime.coordinator_retry_ready() {
actions.push(ConsumerNetworkAction::EnsureCoordinator);
}
return actions;
}
if runtime.has_group_subscription() && runtime.heartbeat_state.should_heartbeat() {
actions.push(ConsumerNetworkAction::Heartbeat);
}
if !runtime.lifecycle.pending_commits.is_empty() {
actions.push(ConsumerNetworkAction::Commit);
}
if runtime.poll_state.pending_poll.is_some()
&& runtime.poll_state.buffered_records.is_empty()
&& !runtime.assignment_state.assignment.is_empty()
&& runtime.assignment_state.pending_assignment.is_none()
&& !actions.iter().any(|action| {
matches!(
action,
ConsumerNetworkAction::RefreshMetadata
| ConsumerNetworkAction::EnsureCoordinator
| ConsumerNetworkAction::Heartbeat
)
})
{
let mut by_leader = HashMap::<i32, Vec<AssignedPartition>>::new();
for assigned in runtime.assignment_state.assignment.values() {
if runtime
.assignment_state
.paused_partitions
.contains(&assigned.key)
{
continue;
}
by_leader
.entry(assigned.leader_id)
.or_default()
.push(assigned.clone());
}
for (leader_id, partitions) in by_leader {
actions.push(ConsumerNetworkAction::Fetch {
leader_id,
partitions,
});
}
}
actions
}
pub fn next_wakeup(runtime: &ConsumerRuntime) -> Duration {
if !runtime.lifecycle.runtime_events.is_empty()
|| (!runtime.poll_state.buffered_records.is_empty()
&& runtime.poll_state.pending_poll.is_some())
|| !runtime.lifecycle.pending_commits.is_empty()
|| runtime.assignment_state.pending_assignment.is_some()
{
return Duration::ZERO;
}
let mut wake = Duration::from_millis(250);
if let Some(deadline) = runtime
.poll_state
.pending_poll
.as_ref()
.map(|poll| poll.deadline)
{
wake = wake.min(deadline.saturating_duration_since(Instant::now()));
}
if let Some(heartbeat_due) = runtime.heartbeat_state.next_heartbeat {
wake = wake.min(heartbeat_due.saturating_duration_since(Instant::now()));
}
if runtime.needs_coordinator() && runtime.connections.coordinator_connection.is_none() {
let coordinator_wake = runtime
.connections
.coordinator_retry_at
.map(|deadline| deadline.saturating_duration_since(Instant::now()))
.unwrap_or(Duration::ZERO);
wake = wake.min(coordinator_wake);
}
if !runtime.lifecycle.shutting_down
&& runtime.config.enable_auto_commit
&& !runtime.poll_state.delivered_offsets.is_empty()
{
let due = runtime.lifecycle.last_auto_commit + runtime.config.auto_commit_interval;
wake = wake.min(due.saturating_duration_since(Instant::now()));
}
wake
}
}
pub enum ConsumerNetworkAction {
RefreshMetadata,
EnsureCoordinator,
Heartbeat,
Commit,
Fetch {
leader_id: i32,
partitions: Vec<AssignedPartition>,
},
LeaveGroup,
}