use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use crabka_client_core::{Client, Connection, ConnectionOptions, fetch_partition_with_isolation};
use crabka_client_producer::{Acks, Producer, ProducerError, ProducerRecord, RecordMetadata};
use crabka_protocol::primitives::uuid::Uuid as WireUuid;
use crabka_protocol::owned::list_offsets_request::{
ListOffsetsPartition, ListOffsetsRequest, ListOffsetsTopic,
};
use crabka_protocol::owned::metadata_request::{MetadataRequest, MetadataRequestTopic};
use crabka_protocol::owned::offset_commit_request::{
OffsetCommitRequest, OffsetCommitRequestPartition, OffsetCommitRequestTopic,
};
use crabka_protocol::owned::offset_fetch_request::{
OffsetFetchRequest, OffsetFetchRequestGroup, OffsetFetchRequestTopic, OffsetFetchRequestTopics,
};
use crate::error::StreamsClientError;
use crate::runtime::eos::{StreamsGroupMeta, TransactionalProducer};
use crate::runtime::io::{
FetchBatch, FetchedRec, IsolationLevel, OffsetStore, RecordFetcher, RecordProducer,
};
pub(crate) struct BrokerFetcher {
conn: Connection,
client: Client,
topic_ids: Mutex<HashMap<String, WireUuid>>,
max_wait_ms: i32,
partition_max_bytes: i32,
}
#[async_trait::async_trait]
impl RecordFetcher for BrokerFetcher {
async fn fetch(
&self,
topic: &str,
partition: i32,
offset: i64,
isolation: IsolationLevel,
) -> Result<FetchBatch, StreamsClientError> {
let topic_id = self.resolve_topic_id(topic).await?;
let isolation_level: i8 = match isolation {
IsolationLevel::ReadUncommitted => 0,
IsolationLevel::ReadCommitted => 1,
};
let fetched = fetch_partition_with_isolation(
&self.conn,
topic,
topic_id,
partition,
offset,
self.max_wait_ms,
self.partition_max_bytes,
isolation_level,
)
.await?;
let records = fetched
.into_iter()
.map(|r| FetchedRec {
offset: r.offset,
key: r.key,
value: r.value,
timestamp: r.timestamp,
})
.collect();
Ok(FetchBatch { records })
}
async fn partitions(&self, topic: &str) -> Result<Vec<i32>, StreamsClientError> {
let resp = self
.client
.send(MetadataRequest {
topics: Some(vec![MetadataRequestTopic {
name: Some(topic.to_string()),
..Default::default()
}]),
..Default::default()
})
.await?;
let count = resp
.topics
.iter()
.find(|t| t.name.as_deref() == Some(topic))
.map(|t| t.partitions.len())
.ok_or_else(|| {
StreamsClientError::Runtime(format!(
"Metadata: topic {topic} not present in response"
))
})?;
let count = i32::try_from(count.max(1)).unwrap_or(1);
Ok((0..count).collect())
}
}
impl BrokerFetcher {
async fn resolve_topic_id(&self, topic: &str) -> Result<WireUuid, StreamsClientError> {
{
let cache = self.topic_ids.lock().await;
if let Some(&id) = cache.get(topic) {
return Ok(id);
}
}
let meta = self.client.refresh_metadata().await?;
let mut cache = self.topic_ids.lock().await;
for t in &meta.topics {
if let Some(name) = &t.name {
cache.insert(name.clone(), t.topic_id);
}
}
Ok(cache.get(topic).copied().unwrap_or_default())
}
}
pub(crate) struct BrokerProducer {
inner: Producer,
pending: Mutex<Vec<oneshot::Receiver<Result<RecordMetadata, ProducerError>>>>,
}
#[async_trait::async_trait]
impl RecordProducer for BrokerProducer {
async fn send(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
) -> Result<(), StreamsClientError> {
let rx = self
.inner
.send(ProducerRecord {
topic: topic.to_string(),
partition,
key,
value,
..Default::default()
})
.await;
self.pending.lock().await.push(rx);
Ok(())
}
async fn send_with_timestamp(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
timestamp_ms: Option<i64>,
) -> Result<(), StreamsClientError> {
let rx = self
.inner
.send(ProducerRecord {
topic: topic.to_string(),
partition,
key,
value,
timestamp_ms,
..Default::default()
})
.await;
self.pending.lock().await.push(rx);
Ok(())
}
async fn flush(&self) -> Result<(), StreamsClientError> {
self.inner
.flush()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
let receivers: Vec<_> = std::mem::take(&mut *self.pending.lock().await);
for rx in receivers {
match rx.await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
return Err(StreamsClientError::Runtime(format!(
"produce ack failed: {e}"
)));
}
Err(_) => {
return Err(StreamsClientError::Runtime(
"produce ack receiver dropped".to_string(),
));
}
}
}
Ok(())
}
}
pub(crate) struct BrokerTransactionalProducer {
inner: Producer,
}
#[async_trait::async_trait]
impl RecordProducer for BrokerTransactionalProducer {
async fn send(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
) -> Result<(), StreamsClientError> {
drop(
self.inner
.send(ProducerRecord {
topic: topic.to_string(),
partition,
key,
value,
..Default::default()
})
.await,
);
Ok(())
}
async fn send_with_timestamp(
&self,
topic: &str,
partition: Option<i32>,
key: Option<Bytes>,
value: Option<Bytes>,
timestamp_ms: Option<i64>,
) -> Result<(), StreamsClientError> {
drop(
self.inner
.send(ProducerRecord {
topic: topic.to_string(),
partition,
key,
value,
timestamp_ms,
..Default::default()
})
.await,
);
Ok(())
}
async fn flush(&self) -> Result<(), StreamsClientError> {
Ok(())
}
}
#[async_trait::async_trait]
impl TransactionalProducer for BrokerTransactionalProducer {
async fn init_transactions(&self) -> Result<(), StreamsClientError> {
self.inner
.init_transactions()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
async fn begin_transaction(&self) -> Result<(), StreamsClientError> {
self.inner
.begin_transaction()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
async fn send_offsets_to_transaction(
&self,
offsets: &[(String, i32, i64)],
m: &StreamsGroupMeta,
) -> Result<(), StreamsClientError> {
let meta = crabka_client_consumer::ConsumerGroupMetadata {
group_id: m.group_id.clone(),
generation_id: m.generation_id,
member_id: m.member_id.clone(),
group_instance_id: m.group_instance_id.clone(),
};
let off = offsets.iter().map(|(t, p, o)| ((t.clone(), *p), *o));
self.inner
.send_offsets_to_transaction(off, &meta)
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
async fn commit_transaction(&self) -> Result<(), StreamsClientError> {
self.inner
.commit_transaction()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
async fn abort_transaction(&self) -> Result<(), StreamsClientError> {
self.inner
.abort_transaction()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))
}
}
pub(crate) struct BrokerOffsetStore {
client: Client,
group_id: String,
topic_ids: Mutex<HashMap<String, WireUuid>>,
}
impl BrokerOffsetStore {
pub(crate) fn new(client: Client, group_id: impl Into<String>) -> Self {
Self {
client,
group_id: group_id.into(),
topic_ids: Mutex::new(HashMap::new()),
}
}
async fn resolve_topic_id(&self, topic: &str) -> Result<WireUuid, StreamsClientError> {
{
let cache = self.topic_ids.lock().await;
if let Some(&id) = cache.get(topic) {
return Ok(id);
}
}
let meta = self.client.refresh_metadata().await?;
let mut cache = self.topic_ids.lock().await;
for t in &meta.topics {
if let Some(name) = &t.name {
cache.insert(name.clone(), t.topic_id);
}
}
Ok(cache.get(topic).copied().unwrap_or_default())
}
}
#[async_trait::async_trait]
impl OffsetStore for BrokerOffsetStore {
async fn committed(
&self,
topic: &str,
partition: i32,
) -> Result<Option<i64>, StreamsClientError> {
let topic_id = self.resolve_topic_id(topic).await?;
let resp = self
.client
.send(OffsetFetchRequest {
group_id: self.group_id.clone(),
topics: Some(vec![OffsetFetchRequestTopic {
name: topic.to_string(),
partition_indexes: vec![partition],
..Default::default()
}]),
groups: vec![OffsetFetchRequestGroup {
group_id: self.group_id.clone(),
topics: Some(vec![OffsetFetchRequestTopics {
name: topic.to_string(),
topic_id,
partition_indexes: vec![partition],
..Default::default()
}]),
..Default::default()
}],
..Default::default()
})
.await?;
if resp.groups.is_empty() {
for t in &resp.topics {
if t.name == topic {
for p in &t.partitions {
if p.partition_index == partition {
return Ok(if p.committed_offset < 0 {
None
} else {
Some(p.committed_offset)
});
}
}
}
}
} else {
for g in &resp.groups {
for t in &g.topics {
let name_matches = t.name.is_empty() || t.name == topic;
let id_matches = t.topic_id == topic_id
|| t.topic_id == WireUuid::default()
|| topic_id == WireUuid::default();
if name_matches || id_matches {
for p in &t.partitions {
if p.partition_index == partition {
return Ok(if p.committed_offset < 0 {
None
} else {
Some(p.committed_offset)
});
}
}
}
}
}
}
Ok(None)
}
async fn earliest(&self, topic: &str, partition: i32) -> Result<i64, StreamsClientError> {
let resp = self
.client
.send(ListOffsetsRequest {
replica_id: -1,
topics: vec![ListOffsetsTopic {
name: topic.to_string(),
partitions: vec![ListOffsetsPartition {
partition_index: partition,
timestamp: -2, ..Default::default()
}],
..Default::default()
}],
..Default::default()
})
.await?;
resp.topics
.first()
.and_then(|t| t.partitions.first())
.map_or_else(
|| {
Err(StreamsClientError::Runtime(format!(
"ListOffsets: no partition in response for {topic}/{partition}"
)))
},
|p| {
if p.error_code != 0 {
Err(StreamsClientError::Runtime(format!(
"ListOffsets error code {} for {topic}/{partition}",
p.error_code
)))
} else {
Ok(p.offset)
}
},
)
}
async fn latest(&self, topic: &str, partition: i32) -> Result<i64, StreamsClientError> {
let resp = self
.client
.send(ListOffsetsRequest {
replica_id: -1,
topics: vec![ListOffsetsTopic {
name: topic.to_string(),
partitions: vec![ListOffsetsPartition {
partition_index: partition,
timestamp: -1, ..Default::default()
}],
..Default::default()
}],
..Default::default()
})
.await?;
resp.topics
.first()
.and_then(|t| t.partitions.first())
.map_or_else(
|| {
Err(StreamsClientError::Runtime(format!(
"ListOffsets: no partition in response for {topic}/{partition}"
)))
},
|p| {
if p.error_code != 0 {
Err(StreamsClientError::Runtime(format!(
"ListOffsets error code {} for {topic}/{partition}",
p.error_code
)))
} else {
Ok(p.offset)
}
},
)
}
async fn commit(&self, offsets: &[(String, i32, i64)]) -> Result<(), StreamsClientError> {
let mut by_topic: HashMap<String, Vec<(i32, i64)>> = HashMap::new();
for (topic, partition, offset) in offsets {
by_topic
.entry(topic.clone())
.or_default()
.push((*partition, *offset));
}
let mut topic_ids: HashMap<String, WireUuid> = HashMap::new();
for name in by_topic.keys() {
let id = self.resolve_topic_id(name).await?;
topic_ids.insert(name.clone(), id);
}
let topics: Vec<OffsetCommitRequestTopic> = by_topic
.into_iter()
.map(|(name, parts)| {
let topic_id = topic_ids.get(&name).copied().unwrap_or_default();
OffsetCommitRequestTopic {
name,
topic_id,
partitions: parts
.into_iter()
.map(
|(partition_index, committed_offset)| OffsetCommitRequestPartition {
partition_index,
committed_offset,
committed_leader_epoch: -1,
committed_metadata: Some(String::new()),
..Default::default()
},
)
.collect(),
..Default::default()
}
})
.collect();
let resp = self
.client
.send(OffsetCommitRequest {
group_id: self.group_id.clone(),
generation_id_or_member_epoch: -1,
member_id: String::new(),
topics,
..Default::default()
})
.await?;
for t in &resp.topics {
for p in &t.partitions {
if p.error_code != 0 {
return Err(StreamsClientError::Runtime(format!(
"OffsetCommit error code {} for topic {} partition {}",
p.error_code, t.name, p.partition_index
)));
}
}
}
Ok(())
}
}
pub(crate) async fn build(
bootstrap: &str,
group_id: &str,
client_id: &str,
) -> Result<(BrokerFetcher, Arc<BrokerProducer>, Arc<BrokerOffsetStore>), StreamsClientError> {
let metadata_client = Client::builder()
.bootstrap(bootstrap)
.client_id(client_id)
.build()
.await?;
let addr = tokio::net::lookup_host(bootstrap)
.await
.map_err(|e| {
StreamsClientError::Runtime(format!("failed to resolve bootstrap address: {e}"))
})?
.next()
.ok_or_else(|| {
StreamsClientError::Runtime(format!("no addresses resolved for bootstrap: {bootstrap}"))
})?;
let fetch_conn = Connection::connect_with_options(
addr,
ConnectionOptions {
client_id: client_id.to_string(),
..Default::default()
},
)
.await?;
let producer = Producer::builder()
.bootstrap(bootstrap)
.client_id(format!("{client_id}-producer"))
.enable_idempotence(true)
.acks(Acks::All)
.build()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
let offset_client = Client::builder()
.bootstrap(bootstrap)
.client_id(format!("{client_id}-offsets"))
.build()
.await?;
let fetcher = BrokerFetcher {
conn: fetch_conn,
client: metadata_client,
topic_ids: Mutex::new(HashMap::new()),
max_wait_ms: 500,
partition_max_bytes: 1 << 20,
};
let broker_producer = Arc::new(BrokerProducer {
inner: producer,
pending: Mutex::new(Vec::new()),
});
let offset_store = Arc::new(BrokerOffsetStore::new(offset_client, group_id));
Ok((fetcher, broker_producer, offset_store))
}
pub(crate) async fn build_eos(
bootstrap: &str,
group_id: &str,
client_id: &str,
transactional_id: &str,
) -> Result<
(
BrokerFetcher,
Arc<BrokerTransactionalProducer>,
Arc<BrokerOffsetStore>,
),
StreamsClientError,
> {
let metadata_client = Client::builder()
.bootstrap(bootstrap)
.client_id(client_id)
.build()
.await?;
let addr = tokio::net::lookup_host(bootstrap)
.await
.map_err(|e| {
StreamsClientError::Runtime(format!("failed to resolve bootstrap address: {e}"))
})?
.next()
.ok_or_else(|| {
StreamsClientError::Runtime(format!("no addresses resolved for bootstrap: {bootstrap}"))
})?;
let fetch_conn = Connection::connect_with_options(
addr,
ConnectionOptions {
client_id: client_id.to_string(),
..Default::default()
},
)
.await?;
let producer = Producer::builder()
.bootstrap(bootstrap)
.client_id(format!("{client_id}-producer"))
.enable_idempotence(true)
.acks(Acks::All)
.transactional_id(transactional_id.to_string())
.build()
.await
.map_err(|e| StreamsClientError::Runtime(e.to_string()))?;
let offset_client = Client::builder()
.bootstrap(bootstrap)
.client_id(format!("{client_id}-offsets"))
.build()
.await?;
let fetcher = BrokerFetcher {
conn: fetch_conn,
client: metadata_client,
topic_ids: Mutex::new(HashMap::new()),
max_wait_ms: 500,
partition_max_bytes: 1 << 20,
};
let txn_producer = Arc::new(BrokerTransactionalProducer { inner: producer });
let offset_store = Arc::new(BrokerOffsetStore::new(offset_client, group_id));
Ok((fetcher, txn_producer, offset_store))
}
#[cfg(test)]
mod tests {
use crabka_broker::{Broker, BrokerConfig};
use crabka_client_core::Client;
use crabka_protocol::owned::create_topics_request::{CreatableTopic, CreateTopicsRequest};
use super::BrokerOffsetStore;
use crate::runtime::io::OffsetStore as _;
async fn boot() -> (crabka_broker::BrokerHandle, String, tempfile::TempDir) {
let dir = tempfile::TempDir::new().unwrap();
let broker = Broker::start(BrokerConfig::for_tests(dir.path().to_path_buf()))
.await
.unwrap();
let bootstrap = broker.listen_addr().to_string();
(broker, bootstrap, dir)
}
async fn create_topic(client: &Client, topic: &str, partitions: i32) {
let resp = client
.send(CreateTopicsRequest {
topics: vec![CreatableTopic {
name: topic.into(),
num_partitions: partitions,
replication_factor: 1,
..Default::default()
}],
timeout_ms: 5_000,
..Default::default()
})
.await
.expect("CreateTopics");
assert_eq!(
resp.topics[0].error_code, 0,
"topic create failed: {resp:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn offset_store_commits_and_reads_back() {
let (_broker, bootstrap, _dir) = boot().await;
let admin = Client::builder()
.bootstrap(&bootstrap)
.client_id("ostore-admin")
.build()
.await
.unwrap();
create_topic(&admin, "ostore-topic", 1).await;
let offset_client = Client::builder()
.bootstrap(&bootstrap)
.client_id("ostore-client")
.build()
.await
.unwrap();
let store = BrokerOffsetStore::new(offset_client, "ostore-grp");
let before = store.committed("ostore-topic", 0).await.unwrap();
assert_eq!(
before, None,
"expected no committed offset before first commit"
);
store
.commit(&[("ostore-topic".to_string(), 0, 42)])
.await
.unwrap();
let after = store.committed("ostore-topic", 0).await.unwrap();
assert_eq!(after, Some(42), "expected committed offset 42 after commit");
}
}