kafkit-client 0.1.0

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::time::Duration;

use tokio::sync::oneshot;
use tokio::time::Instant;
use uuid::Uuid;

use crate::Result;
use crate::metadata::MetadataCache;
use crate::network::BrokerConnection;
use crate::types::{
    AssignedPartition, CommitOffset, ConsumerRecord, ConsumerRecords, TopicPartitionKey,
};
use crate::{Result as ClientResult, consumer::ConsumerRuntimeEvent};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommitKind {
    Manual,
    Auto,
}

#[derive(Default)]
pub struct ConsumerConnectionState {
    pub metadata: MetadataCache,
    pub leader_connections: HashMap<i32, BrokerConnection>,
    pub coordinator_connection: Option<BrokerConnection>,
    pub coordinator_retry_at: Option<Instant>,
}

pub struct ConsumerAssignmentState {
    pub group_subscription: crate::consumer::runtime::ConsumerSubscription,
    pub manual_assignment: BTreeSet<TopicPartitionKey>,
    pub paused_partitions: BTreeSet<TopicPartitionKey>,
    pub assignment: HashMap<TopicPartitionKey, AssignedPartition>,
    pub pending_assignment:
        Option<kafka_protocol::messages::consumer_group_heartbeat_response::Assignment>,
}

impl Default for ConsumerAssignmentState {
    fn default() -> Self {
        Self {
            group_subscription: crate::consumer::runtime::ConsumerSubscription::None,
            manual_assignment: BTreeSet::new(),
            paused_partitions: BTreeSet::new(),
            assignment: HashMap::new(),
            pending_assignment: None,
        }
    }
}

#[derive(Default)]
pub struct ConsumerPollState {
    pub buffered_records: Vec<ConsumerRecord>,
    pub delivered_offsets: HashMap<TopicPartitionKey, i64>,
    pub pending_poll: Option<PendingPoll>,
    pub wakeup_pending: bool,
}

pub struct ConsumerLifecycleState {
    pub runtime_events: VecDeque<ConsumerRuntimeEvent>,
    pub pending_commits: VecDeque<PendingCommit>,
    pub last_auto_commit: Instant,
    pub shutting_down: bool,
    pub close_reply: Option<oneshot::Sender<ClientResult<()>>>,
}

impl ConsumerLifecycleState {
    pub fn new() -> Self {
        Self {
            runtime_events: VecDeque::new(),
            pending_commits: VecDeque::new(),
            last_auto_commit: Instant::now(),
            shutting_down: false,
            close_reply: None,
        }
    }
}

pub struct PendingCommit {
    pub offsets: Vec<CommitOffset>,
    pub reply: Option<oneshot::Sender<Result<()>>>,
    pub kind: CommitKind,
}

pub struct HeartbeatState {
    pub member_id: String,
    pub member_epoch: i32,
    pub next_heartbeat: Option<Instant>,
    pub last_application_poll: Instant,
    pub assignment_dirty: bool,
    pub sent_fields: HeartbeatSentFields,
}

impl HeartbeatState {
    pub fn new() -> Self {
        Self {
            member_id: format!("rust-member-{}", Uuid::new_v4()),
            member_epoch: 0,
            next_heartbeat: Some(Instant::now()),
            last_application_poll: Instant::now(),
            assignment_dirty: true,
            sent_fields: HeartbeatSentFields::default(),
        }
    }

    pub fn mark_subscription_changed(&mut self) {
        self.next_heartbeat = Some(Instant::now());
        self.assignment_dirty = true;
        self.sent_fields = HeartbeatSentFields::default();
    }

    pub fn should_heartbeat(&self) -> bool {
        self.next_heartbeat
            .map(|deadline| Instant::now() >= deadline)
            .unwrap_or(true)
    }

    pub fn is_joining(&self) -> bool {
        self.member_epoch == 0
    }

    pub fn backoff(&mut self, retry_backoff: Duration) {
        self.next_heartbeat = Some(Instant::now() + retry_backoff);
    }

    pub fn mark_heartbeat_success(&mut self, interval: Duration) {
        self.next_heartbeat = Some(Instant::now() + interval.max(Duration::from_millis(1)));
    }

    pub fn mark_left(&mut self) {
        self.member_epoch = 0;
        self.next_heartbeat = None;
        self.assignment_dirty = true;
        self.sent_fields = HeartbeatSentFields::default();
    }

    pub fn server_assignor_changed(&self, assignor: &Option<String>) -> bool {
        self.sent_fields.server_assignor != *assignor
    }
}

#[derive(Default)]
pub struct HeartbeatSentFields {
    pub rebalance_timeout_ms: Option<i32>,
    pub subscribed_topics: Option<BTreeSet<String>>,
    pub subscribed_topic_regex: Option<String>,
    pub server_assignor: Option<String>,
    pub assignment_snapshot: Option<BTreeMap<Uuid, Vec<i32>>>,
}

pub struct PendingPoll {
    pub deadline: Instant,
    pub reply: oneshot::Sender<Result<ConsumerRecords>>,
}