#![allow(dead_code)]
use std::sync::Arc;
use crabka_metadata::{MetadataRecord, PartitionRecord, TopicRecord};
use crabka_raft::RaftError;
use uuid::Uuid;
pub const TOPIC: &str = "__transaction_state";
pub const NUM_PARTITIONS: i32 = 50;
pub(crate) async fn ensure_topic(
controller: &Arc<dyn crate::metadata_source::MetadataSource>,
) -> Result<(), crate::error::BrokerError> {
let image = controller.current_image();
if image.topic(TOPIC).is_some() {
return Ok(());
}
let mut sorted: Vec<u64> = image.brokers().map(|b| b.node_id).collect();
if sorted.is_empty() {
return Err(crate::error::BrokerError::Txn(
"no brokers registered; cannot bootstrap __transaction_state".into(),
));
}
sorted.sort_unstable();
let k = sorted.len();
let rf_usize = k.min(3);
let rf = i16::try_from(rf_usize).expect("rf <= 3 always fits in i16");
let mut records: Vec<MetadataRecord> = Vec::new();
let topic_id = Uuid::new_v4();
records.push(MetadataRecord::V1Topic(TopicRecord {
name: TOPIC.to_string(),
topic_id,
partitions: NUM_PARTITIONS,
replication_factor: rf,
}));
for p in 0..NUM_PARTITIONS {
let mut replicas = Vec::with_capacity(rf_usize);
let base = usize::try_from(p).expect("partition index fits in usize");
for i in 0..rf_usize {
replicas.push(sorted[(base + i) % k]);
}
records.push(MetadataRecord::V1Partition(PartitionRecord {
topic: TOPIC.to_string(),
partition: p,
leader: replicas[0],
replicas: replicas.clone(),
isr: replicas,
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
}
match controller.submit_change(records).await {
Ok(()) | Err(RaftError::Metadata(crabka_metadata::MetadataError::TopicExists(_))) => Ok(()),
Err(e) => Err(crate::error::BrokerError::Txn(format!(
"submit_change failed: {e}"
))),
}
}