use bytes::{Bytes, BytesMut};
use futures_util::future::BoxFuture;
use crabka_protocol::Decode;
use crabka_protocol::Encode;
use crabka_protocol::owned::write_txn_markers_request::WriteTxnMarkersRequest;
use crabka_protocol::owned::write_txn_markers_response::{
WritableTxnMarkerPartitionResult, WritableTxnMarkerResult, WritableTxnMarkerTopicResult,
WriteTxnMarkersResponse,
};
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::txn::marker::{MarkerType, build_marker_batch};
pub(crate) fn handle(
broker: &Broker,
version: i16,
_correlation_id: i32,
req_bytes: &[u8],
) -> BoxFuture<'static, Result<Bytes, BrokerError>> {
let req_bytes = req_bytes.to_vec();
let partitions = broker.partitions.clone();
Box::pin(async move {
let mut cur: &[u8] = &req_bytes;
let req = WriteTxnMarkersRequest::decode(&mut cur, version)?;
let mut marker_results: Vec<WritableTxnMarkerResult> = Vec::new();
for marker_entry in &req.markers {
let marker_type = if marker_entry.transaction_result {
MarkerType::Commit
} else {
MarkerType::Abort
};
let pid = marker_entry.producer_id;
let epoch = marker_entry.producer_epoch;
let mut topic_results: Vec<WritableTxnMarkerTopicResult> = Vec::new();
for topic in &marker_entry.topics {
let mut partition_results: Vec<WritableTxnMarkerPartitionResult> = Vec::new();
for &p in &topic.partition_indexes {
let error_code = match partitions.get(&topic.name, p) {
None => {
tracing::debug!(
topic = %topic.name,
partition = p,
"WriteTxnMarkers: partition not local; returning NOT_LEADER_OR_FOLLOWER"
);
codes::NOT_LEADER_OR_FOLLOWER
}
Some(part) => {
let base_offset = part.log_end_offset();
let marker = build_marker_batch(pid, epoch, base_offset, marker_type);
match part.produce_batch(marker).await {
Ok(_) => codes::NONE,
Err(e) => {
tracing::warn!(
topic = %topic.name,
partition = p,
error = %e,
"WriteTxnMarkers: produce_batch failed"
);
codes::UNKNOWN_SERVER_ERROR
}
}
}
};
partition_results.push(WritableTxnMarkerPartitionResult {
partition_index: p,
error_code,
..Default::default()
});
}
topic_results.push(WritableTxnMarkerTopicResult {
name: topic.name.clone(),
partitions: partition_results,
..Default::default()
});
}
marker_results.push(WritableTxnMarkerResult {
producer_id: pid,
topics: topic_results,
..Default::default()
});
}
let resp = WriteTxnMarkersResponse {
markers: marker_results,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(version));
resp.encode(&mut buf, version)?;
Ok(buf.freeze())
})
}