use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::Instant;
use uuid::Uuid;
use crate::metadata::MetadataCache;
use crate::network::BrokerConnection;
use crate::types::{
AssignedPartition, CommitOffset, ConsumerRecord, ConsumerRecords, TopicPartitionKey,
};
use crate::{CancellationToken, Result};
use crate::{Result as ClientResult, consumer::ConsumerRuntimeEvent};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommitKind {
Manual,
Auto,
}
#[derive(Default)]
pub struct ConsumerConnectionState {
pub metadata: MetadataCache,
pub leader_connections: HashMap<i32, BrokerConnection>,
pub coordinator_connection: Option<BrokerConnection>,
pub coordinator_retry_at: Option<Instant>,
}
pub struct ConsumerAssignmentState {
pub group_subscription: crate::consumer::runtime::ConsumerSubscription,
pub manual_assignment: BTreeSet<TopicPartitionKey>,
pub paused_partitions: BTreeSet<TopicPartitionKey>,
pub assignment: HashMap<TopicPartitionKey, AssignedPartition>,
pub pending_assignment:
Option<kafka_protocol::messages::consumer_group_heartbeat_response::Assignment>,
}
impl Default for ConsumerAssignmentState {
fn default() -> Self {
Self {
group_subscription: crate::consumer::runtime::ConsumerSubscription::None,
manual_assignment: BTreeSet::new(),
paused_partitions: BTreeSet::new(),
assignment: HashMap::new(),
pending_assignment: None,
}
}
}
#[derive(Default)]
pub struct ConsumerPollState {
pub buffered_records: Vec<ConsumerRecord>,
pub delivered_offsets: HashMap<TopicPartitionKey, i64>,
pub pending_poll: Option<PendingPoll>,
pub wakeup_pending: bool,
}
pub struct ConsumerLifecycleState {
pub runtime_events: VecDeque<ConsumerRuntimeEvent>,
pub pending_commits: VecDeque<PendingCommit>,
pub last_auto_commit: Instant,
pub shutting_down: bool,
pub close_reply: Option<oneshot::Sender<ClientResult<()>>>,
}
impl ConsumerLifecycleState {
pub fn new() -> Self {
Self {
runtime_events: VecDeque::new(),
pending_commits: VecDeque::new(),
last_auto_commit: Instant::now(),
shutting_down: false,
close_reply: None,
}
}
}
pub struct PendingCommit {
pub offsets: Vec<CommitOffset>,
pub cancellation: Option<CancellationToken>,
pub reply: Option<oneshot::Sender<Result<()>>>,
pub kind: CommitKind,
}
pub struct HeartbeatState {
pub member_id: String,
pub member_epoch: i32,
pub next_heartbeat: Option<Instant>,
pub last_application_poll: Instant,
pub assignment_dirty: bool,
pub sent_fields: HeartbeatSentFields,
}
impl HeartbeatState {
pub fn new() -> Self {
Self {
member_id: format!("rust-member-{}", Uuid::new_v4()),
member_epoch: 0,
next_heartbeat: Some(Instant::now()),
last_application_poll: Instant::now(),
assignment_dirty: true,
sent_fields: HeartbeatSentFields::default(),
}
}
pub fn mark_subscription_changed(&mut self) {
self.next_heartbeat = Some(Instant::now());
self.assignment_dirty = true;
self.sent_fields = HeartbeatSentFields::default();
}
pub fn should_heartbeat(&self) -> bool {
self.next_heartbeat
.map(|deadline| Instant::now() >= deadline)
.unwrap_or(true)
}
pub fn is_joining(&self) -> bool {
self.member_epoch == 0
}
pub fn backoff(&mut self, retry_backoff: Duration) {
self.next_heartbeat = Some(Instant::now() + retry_backoff);
}
pub fn mark_heartbeat_success(&mut self, interval: Duration) {
self.next_heartbeat = Some(Instant::now() + interval.max(Duration::from_millis(1)));
}
pub fn mark_left(&mut self) {
self.member_epoch = 0;
self.next_heartbeat = None;
self.assignment_dirty = true;
self.sent_fields = HeartbeatSentFields::default();
}
pub fn server_assignor_changed(&self, assignor: &Option<String>) -> bool {
self.sent_fields.server_assignor != *assignor
}
}
#[derive(Default)]
pub struct HeartbeatSentFields {
pub rebalance_timeout_ms: Option<i32>,
pub subscribed_topics: Option<BTreeSet<String>>,
pub subscribed_topic_regex: Option<String>,
pub server_assignor: Option<String>,
pub assignment_snapshot: Option<BTreeMap<Uuid, Vec<i32>>>,
}
pub struct PendingPoll {
pub deadline: Instant,
pub cancellation: Option<CancellationToken>,
pub reply: oneshot::Sender<Result<ConsumerRecords>>,
}