use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::txn_offset_commit_request::TxnOffsetCommitRequest;
use crabka_protocol::owned::txn_offset_commit_response::{
TxnOffsetCommitResponse, TxnOffsetCommitResponsePartition, TxnOffsetCommitResponseTopic,
};
use crabka_protocol::records::{Attributes, Record, RecordBatch};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult, authorize_topics};
use crate::broker::Broker;
use crate::codes;
use crate::coordinator::bootstrap::{OFFSETS_PARTITION, OFFSETS_TOPIC};
use crate::coordinator::persistence::OffsetCommitValue;
use crate::coordinator::unified::actor::{GroupKindTag, validate_group_commit};
use crate::coordinator::unified::classic_state::OffsetEntry;
use crate::coordinator::unified::streams::actor::validate_streams_group_commit;
use crate::error::BrokerError;
use crate::txn::coordinator::OffsetKey;
use crate::txn::util::now_millis;
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let partitions = broker.partitions.clone();
let mut cur: &[u8] = req_bytes;
let req = TxnOffsetCommitRequest::decode(&mut cur, version)?;
{
let image = broker.controller.current_image();
let authorizer = broker.config.authorizer.as_ref();
let tid_req = AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: ResourceType::TransactionalId,
resource_name: req.transactional_id.as_str(),
operation: AclOperation::Write,
};
if authorizer.authorize(&*image, &tid_req) == AuthorizationResult::Deny {
return encode_err_all(version, &req, codes::TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
}
let group_req = AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: ResourceType::Group,
resource_name: req.group_id.as_str(),
operation: AclOperation::Read,
};
if authorizer.authorize(&*image, &group_req) == AuthorizationResult::Deny {
return encode_err_all(version, &req, codes::GROUP_AUTHORIZATION_FAILED);
}
}
let topic_decisions = {
let image = broker.controller.current_image();
let topic_names: Vec<&str> = req.topics.iter().map(|t| t.name.as_str()).collect();
authorize_topics(
broker.config.authorizer.as_ref(),
&*image,
ctx.principal,
ctx.peer,
AclOperation::Read,
topic_names,
)
};
let denied_topics: std::collections::HashSet<String> = topic_decisions
.into_iter()
.filter_map(|(name, r)| {
if r == AuthorizationResult::Deny {
Some(name.to_string())
} else {
None
}
})
.collect();
let handle = broker
.group_coordinator
.find(&req.group_id)
.unwrap_or_else(|| {
broker
.group_coordinator
.get_or_create_group(&req.group_id, GroupKindTag::Classic)
});
if version >= 3 {
let code = if let Some(streams) = broker.group_coordinator.find_streams(&req.group_id) {
validate_streams_group_commit(&streams, &req.member_id, req.generation_id).await
} else {
validate_group_commit(
&handle,
&req.member_id,
req.generation_id,
req.group_instance_id.as_deref(),
)
.await
};
if let Some(code) = code {
return encode_err_all(version, &req, code);
}
}
let now_ms = now_millis();
let buffered = match append_txn_batch(&req, &partitions, now_ms, &denied_topics).await {
Ok(entries) => entries,
Err(code) => {
return encode_resp(version, &build_response(&req, code, &denied_topics));
}
};
broker
.txn_coordinator
.buffer_txn_offsets(req.producer_id, &req.group_id, buffered);
encode_resp(version, &build_response(&req, codes::NONE, &denied_topics))
}
async fn append_txn_batch(
req: &TxnOffsetCommitRequest,
partitions: &std::sync::Arc<crate::partition_registry::PartitionRegistry>,
now_ms: i64,
denied_topics: &std::collections::HashSet<String>,
) -> Result<Vec<(OffsetKey, OffsetEntry)>, i16> {
let mut batch = RecordBatch {
attributes: Attributes::default().with_transactional(true),
max_timestamp: now_ms,
producer_id: req.producer_id,
producer_epoch: req.producer_epoch,
..RecordBatch::default()
};
let mut entries: Vec<(OffsetKey, OffsetEntry)> = Vec::new();
let mut delta: i32 = 0;
for topic in &req.topics {
if denied_topics.contains(&topic.name) {
continue;
}
for part in &topic.partitions {
let value = OffsetCommitValue {
offset: part.committed_offset,
leader_epoch: part.committed_leader_epoch,
metadata: part.committed_metadata.clone().unwrap_or_default(),
commit_timestamp_ms: now_ms,
};
batch.records.push(Record {
offset_delta: delta,
timestamp_delta: 0,
key: Some(OffsetCommitValue::encode_key(
&req.group_id,
&topic.name,
part.partition_index,
)),
value: Some(value.encode_value()),
..Default::default()
});
entries.push((
(topic.name.clone(), part.partition_index),
OffsetEntry {
offset: value.offset,
leader_epoch: value.leader_epoch,
metadata: value.metadata.clone(),
commit_timestamp_ms: value.commit_timestamp_ms,
},
));
delta += 1;
}
}
if batch.records.is_empty() {
return Ok(entries);
}
batch.last_offset_delta = (delta - 1).max(0);
let Some(part_handle) = partitions.get(OFFSETS_TOPIC, OFFSETS_PARTITION) else {
return Err(codes::NOT_COORDINATOR);
};
part_handle
.produce_batch(batch)
.await
.map(|_| entries)
.map_err(|e| {
tracing::error!(
group = %req.group_id,
tid = %req.transactional_id,
error = %e,
"TxnOffsetCommit: produce_batch failed"
);
codes::UNKNOWN_SERVER_ERROR
})
}
fn build_response(
req: &TxnOffsetCommitRequest,
code: i16,
denied_topics: &std::collections::HashSet<String>,
) -> TxnOffsetCommitResponse {
let topics = req
.topics
.iter()
.map(|t| {
let row_code = if denied_topics.contains(&t.name) {
codes::TOPIC_AUTHORIZATION_FAILED
} else {
code
};
TxnOffsetCommitResponseTopic {
name: t.name.clone(),
partitions: t
.partitions
.iter()
.map(|p| TxnOffsetCommitResponsePartition {
partition_index: p.partition_index,
error_code: row_code,
..Default::default()
})
.collect(),
..Default::default()
}
})
.collect();
TxnOffsetCommitResponse {
throttle_time_ms: 0,
topics,
..Default::default()
}
}
fn encode_resp(version: i16, resp: &TxnOffsetCommitResponse) -> Result<Bytes, BrokerError> {
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
fn encode_err_all(
version: i16,
req: &TxnOffsetCommitRequest,
code: i16,
) -> Result<Bytes, BrokerError> {
let empty: std::collections::HashSet<String> = std::collections::HashSet::new();
encode_resp(version, &build_response(req, code, &empty))
}