use std::time::Instant;
use bytes::{Bytes, BytesMut};
use crabka_metadata::{AclOperation, ResourceType};
use crabka_protocol::owned::share_acknowledge_request::ShareAcknowledgeRequest;
use crabka_protocol::owned::share_acknowledge_response::{
LeaderIdAndEpoch, PartitionData, ShareAcknowledgeResponse, ShareAcknowledgeTopicResponse,
};
use crabka_protocol::{Decode, Encode};
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::handlers::share_fetch::apply_one_ack;
#[allow(clippy::too_many_lines)]
pub(crate) async fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
ctx: &crate::handlers::RequestContext<'_>,
) -> Result<Bytes, BrokerError> {
let mut cur: &[u8] = req_bytes;
let req = ShareAcknowledgeRequest::decode(&mut cur, version)?;
let cfg = broker.config.share_group.clone();
let lock_timeout_ms = i32::try_from(cfg.record_lock_duration.as_millis()).unwrap_or(i32::MAX);
if !cfg.enable {
return encode_error_response(version, codes::UNSUPPORTED_VERSION, lock_timeout_ms);
}
let group = req.group_id.clone().unwrap_or_default();
let member = req.member_id.clone().unwrap_or_default();
if let Err(code) =
broker
.share_partition_leaders
.validate_session(&group, &member, req.share_session_epoch)
{
return encode_error_response(version, code, lock_timeout_ms);
}
let mgr = broker.share_partition_leaders.clone();
let image = broker.controller.current_image();
let now = Instant::now();
let mut responses: Vec<ShareAcknowledgeTopicResponse> = Vec::with_capacity(req.topics.len());
for topic in &req.topics {
let topic_id = uuid::Uuid::from_bytes(topic.topic_id.0);
let topic_name = mgr.topic_name_for(topic_id);
let denied = match topic_name.as_deref() {
Some(name) => {
broker.config.authorizer.authorize(
&*image,
&AuthorizationRequest {
principal: ctx.principal,
host: ctx.peer,
resource_type: ResourceType::Topic,
resource_name: name,
operation: AclOperation::Read,
},
) == AuthorizationResult::Deny
}
None => true,
};
let mut parts: Vec<PartitionData> = Vec::with_capacity(topic.partitions.len());
for ap in &topic.partitions {
let mut out = PartitionData {
partition_index: ap.partition_index,
..Default::default()
};
if denied {
out.error_code = if topic_name.is_some() {
codes::TOPIC_AUTHORIZATION_FAILED
} else {
codes::UNKNOWN_TOPIC_OR_PARTITION
};
parts.push(out);
continue;
}
if !mgr.topic_leader_is_self(topic_id, ap.partition_index) {
let (leader_id, leader_epoch) = mgr.current_leader_of(topic_id, ap.partition_index);
out.error_code = codes::NOT_LEADER_OR_FOLLOWER;
out.current_leader = LeaderIdAndEpoch {
leader_id,
leader_epoch,
..Default::default()
};
parts.push(out);
continue;
}
let cell = mgr.get_or_load(&group, topic_id, ap.partition_index).await;
let mut st = cell.lock().await;
let mut err = codes::NONE;
for batch in &ap.acknowledgement_batches {
let res = if req.is_renew_ack {
st.renew(
&member,
batch.first_offset,
batch.last_offset,
now,
cfg.record_lock_duration,
)
} else {
apply_one_ack(
&mut st,
&member,
batch.first_offset,
batch.last_offset,
&batch.acknowledge_types,
now,
)
};
if let Err(code) = res {
err = code;
}
}
out.error_code = err;
mgr.persist_if_dirty(&group, topic_id, ap.partition_index, &mut st)
.await;
parts.push(out);
}
responses.push(ShareAcknowledgeTopicResponse {
topic_id: topic.topic_id,
partitions: parts,
..Default::default()
});
}
let resp = ShareAcknowledgeResponse {
throttle_time_ms: 0,
error_code: codes::NONE,
error_message: None,
acquisition_lock_timeout_ms: lock_timeout_ms,
responses,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}
fn encode_error_response(
version: i16,
error_code: i16,
lock_timeout_ms: i32,
) -> Result<Bytes, BrokerError> {
let resp = ShareAcknowledgeResponse {
throttle_time_ms: 0,
error_code,
error_message: None,
acquisition_lock_timeout_ms: lock_timeout_ms,
responses: Vec::new(),
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
}