kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::HashMap;
use std::time::Duration;

use tokio::time::Instant;

use super::runtime::ConsumerRuntime;
use crate::types::AssignedPartition;

pub struct ConsumerRequestManagers;

impl ConsumerRequestManagers {
    pub fn poll(runtime: &ConsumerRuntime) -> Vec<ConsumerNetworkAction> {
        let mut actions = Vec::new();

        if runtime.lifecycle.shutting_down {
            if !runtime.lifecycle.pending_commits.is_empty() {
                if runtime.connections.coordinator_connection.is_none() {
                    if runtime.coordinator_retry_ready() {
                        actions.push(ConsumerNetworkAction::EnsureCoordinator);
                    }
                    return actions;
                }
                actions.push(ConsumerNetworkAction::Commit);
                return actions;
            }

            if runtime.heartbeat_state.member_epoch > 0 {
                if runtime.connections.coordinator_connection.is_none() {
                    if runtime.coordinator_retry_ready() {
                        actions.push(ConsumerNetworkAction::EnsureCoordinator);
                    }
                    return actions;
                }
                actions.push(ConsumerNetworkAction::LeaveGroup);
            }
            return actions;
        }

        let desired_topics = runtime.desired_topics();
        let pending_assignment_unresolved = runtime
            .assignment_state
            .pending_assignment
            .as_ref()
            .is_some_and(|assignment| !runtime.can_resolve_assignment(assignment));

        if (!desired_topics.is_empty()
            && runtime
                .connections
                .metadata
                .needs_any_refresh(desired_topics, runtime.config.metadata_max_age))
            || (runtime.is_pattern_subscription()
                && runtime
                    .connections
                    .metadata
                    .is_stale(runtime.config.metadata_max_age))
            || pending_assignment_unresolved
        {
            actions.push(ConsumerNetworkAction::RefreshMetadata);
        }

        if runtime.needs_coordinator() && runtime.connections.coordinator_connection.is_none() {
            if runtime.coordinator_retry_ready() {
                actions.push(ConsumerNetworkAction::EnsureCoordinator);
            }
            return actions;
        }

        if runtime.has_group_subscription() && runtime.heartbeat_state.should_heartbeat() {
            actions.push(ConsumerNetworkAction::Heartbeat);
        }

        if !runtime.lifecycle.pending_commits.is_empty() {
            actions.push(ConsumerNetworkAction::Commit);
        }

        if runtime.poll_state.pending_poll.is_some()
            && runtime.poll_state.buffered_records.is_empty()
            && !runtime.assignment_state.assignment.is_empty()
            && runtime.assignment_state.pending_assignment.is_none()
            && !actions.iter().any(|action| {
                matches!(
                    action,
                    ConsumerNetworkAction::RefreshMetadata
                        | ConsumerNetworkAction::EnsureCoordinator
                        | ConsumerNetworkAction::Heartbeat
                )
            })
        {
            let mut by_leader = HashMap::<i32, Vec<AssignedPartition>>::new();
            for assigned in runtime.assignment_state.assignment.values() {
                if runtime
                    .assignment_state
                    .paused_partitions
                    .contains(&assigned.key)
                {
                    continue;
                }
                by_leader
                    .entry(assigned.leader_id)
                    .or_default()
                    .push(assigned.clone());
            }

            for (leader_id, partitions) in by_leader {
                actions.push(ConsumerNetworkAction::Fetch {
                    leader_id,
                    partitions,
                });
            }
        }

        actions
    }

    pub fn next_wakeup(runtime: &ConsumerRuntime) -> Duration {
        if !runtime.lifecycle.runtime_events.is_empty()
            || (!runtime.poll_state.buffered_records.is_empty()
                && runtime.poll_state.pending_poll.is_some())
            || !runtime.lifecycle.pending_commits.is_empty()
            || runtime.assignment_state.pending_assignment.is_some()
        {
            return Duration::ZERO;
        }

        let mut wake = Duration::from_millis(250);

        if let Some(deadline) = runtime
            .poll_state
            .pending_poll
            .as_ref()
            .map(|poll| poll.deadline)
        {
            wake = wake.min(deadline.saturating_duration_since(Instant::now()));
        }

        if let Some(heartbeat_due) = runtime.heartbeat_state.next_heartbeat {
            wake = wake.min(heartbeat_due.saturating_duration_since(Instant::now()));
        }

        if runtime.needs_coordinator() && runtime.connections.coordinator_connection.is_none() {
            let coordinator_wake = runtime
                .connections
                .coordinator_retry_at
                .map(|deadline| deadline.saturating_duration_since(Instant::now()))
                .unwrap_or(Duration::ZERO);
            wake = wake.min(coordinator_wake);
        }

        if !runtime.lifecycle.shutting_down
            && runtime.config.enable_auto_commit
            && !runtime.poll_state.delivered_offsets.is_empty()
        {
            let due = runtime.lifecycle.last_auto_commit + runtime.config.auto_commit_interval;
            wake = wake.min(due.saturating_duration_since(Instant::now()));
        }

        wake
    }
}

pub enum ConsumerNetworkAction {
    RefreshMetadata,
    EnsureCoordinator,
    Heartbeat,
    Commit,
    Fetch {
        leader_id: i32,
        partitions: Vec<AssignedPartition>,
    },
    LeaveGroup,
}