use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crabka_client_core::Client;
use crabka_protocol::owned::metadata_request::MetadataRequest;
use crabka_protocol::owned::share_group_heartbeat_request::ShareGroupHeartbeatRequest;
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
use super::coordinator::ShareCoordinatorState;
use super::types::ShareAckMode;
use crate::error::ConsumerError;
pub struct ShareConsumer {
pub(crate) client: Client,
pub(crate) group_id: String,
pub(crate) member_id: String,
#[allow(dead_code)]
pub(crate) member_epoch: Arc<Mutex<i32>>,
pub(crate) assignment: Arc<Mutex<Vec<(WireUuid, String, i32)>>>,
pub(crate) topic_names: Arc<Mutex<HashMap<WireUuid, String>>>,
pub(crate) share_session_epoch: i32,
pub(crate) ack_mode: ShareAckMode,
pub(crate) pending_acks: Vec<(WireUuid, i32, i64, i64, i8)>,
pub(crate) prev_delivered: Vec<(WireUuid, i32, i64, i64)>,
pub(crate) shutdown: CancellationToken,
pub(crate) hb_handle: Option<JoinHandle<()>>,
}
#[bon::bon]
impl ShareConsumer {
#[builder(start_fn = builder, finish_fn = build)]
pub async fn start(
#[builder(into)] bootstrap: String,
#[builder(into, default = "crabka-share-consumer".to_string())] client_id: String,
#[builder(into)] group_id: String,
#[builder(into)] subscribe: Vec<String>,
#[builder(default = ShareAckMode::Implicit)] ack_mode: ShareAckMode,
#[builder(default = std::time::Duration::from_secs(45))] session_timeout: Duration,
#[builder(default = std::time::Duration::from_secs(3))] heartbeat_interval: Duration,
security: Option<crabka_client_core::security::ClientSecurity>,
) -> Result<Self, ConsumerError> {
let _ = session_timeout;
if subscribe.is_empty() {
return Err(ConsumerError::NotSubscribed);
}
if group_id.is_empty() {
return Err(ConsumerError::RebalanceFailed("group_id required".into()));
}
let client = Client::builder()
.bootstrap(&bootstrap)
.client_id(client_id.clone())
.maybe_security(security.clone())
.build()
.await?;
let join = client
.send(ShareGroupHeartbeatRequest {
group_id: group_id.clone(),
member_id: String::new(),
member_epoch: 0,
subscribed_topic_names: Some(subscribe.clone()),
..Default::default()
})
.await?;
if join.error_code != 0 {
return Err(ConsumerError::Server(join.error_code));
}
let member_id = join.member_id.clone().unwrap_or_default();
if member_id.is_empty() {
return Err(ConsumerError::RebalanceFailed(
"broker did not assign a member_id".into(),
));
}
let member_epoch_val = join.member_epoch;
let hb_interval = if join.heartbeat_interval_ms > 0 {
Duration::from_millis(u64::try_from(join.heartbeat_interval_ms).unwrap_or(0))
} else {
heartbeat_interval
};
let md = client.send(MetadataRequest::default()).await?;
let mut topic_names: HashMap<WireUuid, String> = HashMap::new();
for t in &md.topics {
if let Some(name) = &t.name {
topic_names.insert(t.topic_id, name.clone());
}
}
let mut assignment_vec: Vec<(WireUuid, String, i32)> = Vec::new();
if let Some(assignment) = join.assignment {
for tp in &assignment.topic_partitions {
let name = topic_names.get(&tp.topic_id).cloned().unwrap_or_default();
for &partition in &tp.partitions {
assignment_vec.push((tp.topic_id, name.clone(), partition));
}
}
}
let member_epoch = Arc::new(Mutex::new(member_epoch_val));
let assignment = Arc::new(Mutex::new(assignment_vec));
let topic_names = Arc::new(Mutex::new(topic_names));
let shutdown = CancellationToken::new();
let coordinator_client = Client::builder()
.bootstrap(&bootstrap)
.client_id(client_id.clone())
.maybe_security(security.clone())
.build()
.await?;
let state = ShareCoordinatorState {
client: coordinator_client,
group_id: group_id.clone(),
member_id: member_id.clone(),
member_epoch: Arc::clone(&member_epoch),
assignment: Arc::clone(&assignment),
topic_names: Arc::clone(&topic_names),
subscribe,
heartbeat_interval: hb_interval,
};
let hb_handle = tokio::spawn(super::coordinator::run(state, shutdown.clone()));
Ok(ShareConsumer {
client,
group_id,
member_id,
member_epoch,
assignment,
topic_names,
share_session_epoch: 0,
ack_mode,
pending_acks: Vec::new(),
prev_delivered: Vec::new(),
shutdown,
hb_handle: Some(hb_handle),
})
}
}
impl ShareConsumer {
#[must_use]
pub fn group_id(&self) -> &str {
&self.group_id
}
#[must_use]
pub fn member_id(&self) -> &str {
&self.member_id
}
pub async fn assignment(&self) -> Vec<(String, i32)> {
self.assignment
.lock()
.await
.iter()
.map(|(_, name, p)| (name.clone(), *p))
.collect()
}
pub async fn close(&mut self) -> Result<(), ConsumerError> {
if self.ack_mode == ShareAckMode::Implicit {
for (tid, partition, first, last) in std::mem::take(&mut self.prev_delivered) {
self.pending_acks.push((
tid,
partition,
first,
last,
super::types::ShareAckType::Accept.wire(),
));
}
}
if let Err(e) = self.flush_pending_acks().await {
tracing::warn!(error = %e, "share consumer close: final acknowledge failed");
}
self.shutdown.cancel();
if let Some(h) = self.hb_handle.take() {
let _ = h.await;
}
Ok(())
}
}