use std::sync::Arc;
use crabka_metadata::NodeId;
use crabka_protocol::owned::delete_share_group_state_request::{
DeleteShareGroupStateRequest, DeleteStateData, PartitionData as DeletePartitionData,
};
use crabka_protocol::owned::initialize_share_group_state_request::{
InitializeShareGroupStateRequest, InitializeStateData, PartitionData as InitPartitionData,
};
use crabka_protocol::owned::read_share_group_state_request::{
PartitionData as ReadPartitionData, ReadShareGroupStateRequest, ReadStateData,
};
use crabka_protocol::owned::write_share_group_state_request::{
PartitionData as WritePartitionData, StateBatch as ProtoStateBatch,
WriteShareGroupStateRequest, WriteStateData,
};
use crabka_protocol::primitives::uuid::Uuid as ProtoUuid;
use crabka_security::ListenerProtocol;
use crate::error::BrokerError;
use crate::metadata_source::MetadataSource;
use crate::network::client::InterBrokerClient;
use crate::share_coordinator::bootstrap;
use crate::share_coordinator::coordinator::ShareCoordinator;
use crate::share_coordinator::persistence::StateBatch;
use crate::share_coordinator::state::SharePartitionState;
pub(crate) struct SharePersister {
node_id: NodeId,
share_coordinator: Arc<ShareCoordinator>,
controller: Arc<dyn MetadataSource>,
inter_broker_client: Arc<InterBrokerClient>,
inter_broker_listener_protocol: ListenerProtocol,
inter_broker_listener_name: String,
}
impl std::fmt::Debug for SharePersister {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharePersister")
.field("node_id", &self.node_id)
.finish_non_exhaustive()
}
}
impl SharePersister {
pub(crate) fn new(
node_id: NodeId,
share_coordinator: Arc<ShareCoordinator>,
controller: Arc<dyn MetadataSource>,
inter_broker_client: Arc<InterBrokerClient>,
inter_broker_listener_protocol: ListenerProtocol,
inter_broker_listener_name: String,
) -> Self {
Self {
node_id,
share_coordinator,
controller,
inter_broker_client,
inter_broker_listener_protocol,
inter_broker_listener_name,
}
}
pub(crate) async fn initialize(
&self,
group: &str,
topic_id: uuid::Uuid,
partition: i32,
state_epoch: i32,
start_offset: i64,
) -> Result<(), BrokerError> {
self.ensure_topic_and_refresh().await?;
let state_partition = self
.share_coordinator
.state_partition_for(group, &topic_id, partition);
if self.share_coordinator.is_leader(state_partition).await {
return self
.share_coordinator
.initialize(group, topic_id, partition, state_epoch, start_offset)
.await
.map_err(|code| {
BrokerError::Share(format!(
"InitializeShareGroupState {group}:{topic_id}:{partition} fenced (code {code})"
))
});
}
let req = InitializeShareGroupStateRequest {
group_id: group.to_string(),
topics: vec![InitializeStateData {
topic_id: ProtoUuid(*topic_id.as_bytes()),
partitions: vec![InitPartitionData {
partition,
state_epoch,
start_offset,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
self.send_to_leader(state_partition, req).await
}
pub(crate) async fn delete(
&self,
group: &str,
topic_id: uuid::Uuid,
partition: i32,
) -> Result<(), BrokerError> {
self.ensure_topic_and_refresh().await?;
let state_partition = self
.share_coordinator
.state_partition_for(group, &topic_id, partition);
if self.share_coordinator.is_leader(state_partition).await {
return self
.share_coordinator
.delete(group, topic_id, partition)
.await
.map_err(|code| {
BrokerError::Share(format!(
"DeleteShareGroupState {group}:{topic_id}:{partition} failed (code {code})"
))
});
}
let req = DeleteShareGroupStateRequest {
group_id: group.to_string(),
topics: vec![DeleteStateData {
topic_id: ProtoUuid(*topic_id.as_bytes()),
partitions: vec![DeletePartitionData {
partition,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
self.send_to_leader(state_partition, req).await
}
async fn ensure_topic_and_refresh(&self) -> Result<(), BrokerError> {
bootstrap::ensure_topic(&self.controller).await?;
self.share_coordinator
.refresh_leader_partitions(&self.controller.current_image())
.await;
Ok(())
}
pub(crate) async fn read_state(
&self,
group: &str,
topic_id: uuid::Uuid,
partition: i32,
) -> Result<Option<SharePartitionState>, BrokerError> {
self.ensure_topic_and_refresh().await?;
let state_partition = self
.share_coordinator
.state_partition_for(group, &topic_id, partition);
if self.share_coordinator.is_leader(state_partition).await {
return Ok(self
.share_coordinator
.read(group, topic_id, partition)
.await);
}
let req = ReadShareGroupStateRequest {
group_id: group.to_string(),
topics: vec![ReadStateData {
topic_id: ProtoUuid(*topic_id.as_bytes()),
partitions: vec![ReadPartitionData {
partition,
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
let resp = self.send_to_leader_resp(state_partition, req).await?;
let part_result = resp
.results
.into_iter()
.flat_map(|t| t.partitions)
.find(|p| p.partition == partition);
let Some(pr) = part_result else {
return Ok(None);
};
if pr.error_code != 0 {
return Ok(None);
}
Ok(Some(SharePartitionState {
state_epoch: pr.state_epoch,
leader_epoch: 0,
start_offset: pr.start_offset,
delivery_complete_count: 0,
state_batches: pr
.state_batches
.into_iter()
.map(|b| StateBatch {
first_offset: b.first_offset,
last_offset: b.last_offset,
delivery_state: b.delivery_state,
delivery_count: b.delivery_count,
})
.collect(),
snapshot_epoch: 0,
last_snapshot_offset: 0,
updates_since_snapshot: 0,
}))
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn write_state(
&self,
group: &str,
topic_id: uuid::Uuid,
partition: i32,
state_epoch: i32,
leader_epoch: i32,
start_offset: i64,
delivery_complete_count: i32,
batches: Vec<StateBatch>,
) -> Result<(), BrokerError> {
self.ensure_topic_and_refresh().await?;
let state_partition = self
.share_coordinator
.state_partition_for(group, &topic_id, partition);
if self.share_coordinator.is_leader(state_partition).await {
return self
.share_coordinator
.write(
group,
topic_id,
partition,
state_epoch,
leader_epoch,
start_offset,
delivery_complete_count,
batches,
)
.await
.map_err(|code| {
BrokerError::Share(format!(
"WriteShareGroupState {group}:{topic_id}:{partition} fenced (code {code})"
))
});
}
let req = WriteShareGroupStateRequest {
group_id: group.to_string(),
topics: vec![WriteStateData {
topic_id: ProtoUuid(*topic_id.as_bytes()),
partitions: vec![WritePartitionData {
partition,
state_epoch,
leader_epoch,
start_offset,
delivery_complete_count,
state_batches: batches
.into_iter()
.map(|b| ProtoStateBatch {
first_offset: b.first_offset,
last_offset: b.last_offset,
delivery_state: b.delivery_state,
delivery_count: b.delivery_count,
..Default::default()
})
.collect(),
..Default::default()
}],
..Default::default()
}],
..Default::default()
};
self.send_to_leader(state_partition, req).await
}
async fn connect_to_leader(
&self,
state_partition: i32,
) -> Result<crabka_client_core::Connection, BrokerError> {
let image = self.controller.current_image();
let pr = image
.partition(bootstrap::TOPIC, state_partition)
.ok_or_else(|| {
BrokerError::Share(format!(
"{}-{state_partition} not present in metadata image",
bootstrap::TOPIC
))
})?;
let leader = pr.leader;
let broker_info = image.broker(leader).ok_or_else(|| {
BrokerError::Share(format!(
"share-state leader node {leader} not in metadata image"
))
})?;
let (host, port) = broker_info
.endpoints
.iter()
.find(|e| e.name == self.inter_broker_listener_name)
.map_or_else(
|| (broker_info.host.clone(), broker_info.port),
|e| (e.host.clone(), e.port),
);
let opts = crabka_client_core::ConnectionOptions {
client_id: format!("crabka-broker-share-{}", self.node_id),
..crabka_client_core::ConnectionOptions::default()
};
self.inter_broker_client
.connect_as_connection(
&host,
port,
self.inter_broker_listener_protocol,
"localhost",
opts,
)
.await
.map_err(|e| BrokerError::Share(format!("share-state connect to {host}:{port}: {e}")))
}
async fn send_to_leader<R>(&self, state_partition: i32, req: R) -> Result<(), BrokerError>
where
R: crabka_client_core::ProtocolRequest,
{
let conn = self.connect_to_leader(state_partition).await?;
let _resp = conn
.send(req)
.await
.map_err(|e| BrokerError::Share(format!("share-state RPC: {e}")))?;
conn.close();
Ok(())
}
async fn send_to_leader_resp<R>(
&self,
state_partition: i32,
req: R,
) -> Result<R::Response, BrokerError>
where
R: crabka_client_core::ProtocolRequest,
{
let conn = self.connect_to_leader(state_partition).await?;
let resp = conn
.send(req)
.await
.map_err(|e| BrokerError::Share(format!("share-state RPC: {e}")))?;
conn.close();
Ok(resp)
}
}