#![allow(dead_code)]
use std::net::SocketAddr;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures_util::{SinkExt, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use tracing::Instrument as _;
use crate::broker::Broker;
use crate::codes;
use crate::error::BrokerError;
use crate::network::codec::{self, MAX_FRAME_BYTES};
const API_VERSIONS_KEY: i16 = 18;
static ANONYMOUS_PRINCIPAL: std::sync::LazyLock<crabka_security::Principal> =
std::sync::LazyLock::new(|| crabka_security::Principal {
name: "ANONYMOUS".to_string(),
auth_method: crabka_security::AuthMethod::Anonymous,
groups: vec![],
});
fn principal_or_anonymous(
auth: &crate::network::auth::ConnectionAuth,
) -> &crabka_security::Principal {
auth.principal().unwrap_or(&ANONYMOUS_PRINCIPAL)
}
async fn sleep_until_some(deadline: Option<tokio::time::Instant>) {
match deadline {
Some(t) => tokio::time::sleep_until(t).await,
None => std::future::pending::<()>().await,
}
}
fn instant_at_epoch_ms(epoch_ms: i64) -> tokio::time::Instant {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0_i64, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
let delta_ms = (epoch_ms - now_ms).max(0);
tokio::time::Instant::now() + std::time::Duration::from_millis(delta_ms.cast_unsigned())
}
fn auth_principal_name(auth: &crate::network::auth::ConnectionAuth) -> Option<&str> {
match auth {
crate::network::auth::ConnectionAuth::Authenticated { principal, .. } => {
Some(principal.name.as_str())
}
crate::network::auth::ConnectionAuth::Reauthenticating { previous, .. } => {
Some(previous.principal.name.as_str())
}
_ => None,
}
}
pub async fn serve_connection_on_listener(
broker: std::sync::Arc<Broker>,
stream: TcpStream,
spec: crate::config::ListenerSpec,
) {
let peer = stream.peer_addr().unwrap_or_else(|e| {
tracing::debug!(error = %e, "peer_addr() failed, using 0.0.0.0:0");
SocketAddr::from(([0u8, 0, 0, 0], 0))
});
if spec.protocol.requires_tls() {
let acceptor = if let Some(per_tls) = spec.tls_config.as_ref() {
match per_tls.build_server_config() {
Ok(sc) => tokio_rustls::TlsAcceptor::from(sc),
Err(e) => {
tracing::error!(
listener = %spec.name,
error = %e,
"failed to build TlsAcceptor from per-listener tls_config"
);
return;
}
}
} else {
let Some(dynamic) = broker.tls_dynamic.as_ref() else {
tracing::error!(
listener = %spec.name,
"TLS listener without per-listener tls_config and no broker-wide tls_dynamic"
);
return;
};
tokio_rustls::TlsAcceptor::from(dynamic.current())
};
match acceptor.accept(stream).await {
Ok(tls_stream) => {
let mtls_principal = peer_cert_principal(&tls_stream);
serve_connection_stream(broker, tls_stream, spec, peer, mtls_principal).await;
}
Err(e) => tracing::debug!(error = %e, "TLS handshake failed"),
}
} else {
serve_connection_plaintext(broker, stream, spec, peer).await;
}
}
fn peer_cert_principal<S>(
stream: &tokio_rustls::server::TlsStream<S>,
) -> Option<crabka_security::Principal> {
let (_, server_conn) = stream.get_ref();
let cert = server_conn.peer_certificates()?.first()?;
let name = crabka_security::extract_principal_from_cert(cert.as_ref())?;
Some(crabka_security::Principal {
name,
auth_method: crabka_security::AuthMethod::MTls,
groups: vec![],
})
}
async fn serve_connection_plaintext(
broker: std::sync::Arc<Broker>,
stream: TcpStream,
spec: crate::config::ListenerSpec,
peer: SocketAddr,
) {
serve_connection_stream(broker, stream, spec, peer, None).await;
}
#[allow(clippy::too_many_lines)] async fn serve_connection_stream<S>(
broker: std::sync::Arc<Broker>,
stream: S,
spec: crate::config::ListenerSpec,
peer: SocketAddr,
mtls_principal: Option<crabka_security::Principal>,
) where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let mut framed: Framed<S, _> = Framed::new(stream, codec::codec());
let is_sasl_listener = spec.protocol.requires_sasl();
let sasl_mechanisms = crate::network::listener::resolve_sasl_mechanisms_for_listener(
&spec,
&broker.config.enabled_sasl_mechanisms,
)
.to_owned();
#[allow(unused_mut)] let mut auth = if is_sasl_listener {
crate::network::auth::ConnectionAuth::Anonymous
} else if let Some(principal) = mtls_principal {
crate::network::auth::ConnectionAuth::Authenticated {
principal,
mechanism: crabka_security::SaslMechanism::Plain,
expires_at_ms: None,
authenticated_via_token: false,
}
} else {
crate::network::auth::ConnectionAuth::Authenticated {
principal: crabka_security::Principal {
name: "ANONYMOUS".to_string(),
auth_method: crabka_security::AuthMethod::Anonymous,
groups: vec![],
},
mechanism: crabka_security::SaslMechanism::Plain,
expires_at_ms: None,
authenticated_via_token: false,
}
};
tracing::info!(listener = %spec.name, sasl = is_sasl_listener, "connection opened");
let mut client_software_name = String::new();
let mut client_software_version = String::new();
loop {
let deadline: Option<tokio::time::Instant> = match &auth {
crate::network::auth::ConnectionAuth::Authenticated {
expires_at_ms: Some(exp_ms),
..
} => Some(instant_at_epoch_ms(*exp_ms)),
crate::network::auth::ConnectionAuth::Reauthenticating { previous, .. } => {
previous.expires_at_ms.map(instant_at_epoch_ms)
}
_ => None,
};
let frame_result = tokio::select! {
biased;
next = framed.next() => next,
() = sleep_until_some(deadline) => {
tracing::info!(
principal = ?auth_principal_name(&auth),
"SASL session expired, closing connection (KIP-368)"
);
break;
}
};
let frame = match frame_result {
Some(Ok(b)) => b,
Some(Err(e)) => {
tracing::warn!(error = %e, "frame decode error, closing");
break;
}
None => break, };
let req_span = if tracing::enabled!(target: crate::telemetry::REQUEST_TARGET, tracing::Level::DEBUG)
{
match parse_request_header(&frame) {
Ok((api_key, api_version, correlation_id, _body)) => {
crate::telemetry::request_span(
api_key,
api_version,
correlation_id,
peek_client_id(&frame),
&peer,
)
}
Err(_) => tracing::Span::none(),
}
} else {
tracing::Span::none()
};
if is_sasl_listener {
match peek_api_key(&frame) {
Ok(api_key) if !auth.allows_request(api_key) => {
tracing::info!(
api_key,
listener = %spec.name,
"request blocked by per-state auth gate (ILLEGAL_SASL_STATE), closing connection"
);
let _ = codes::ILLEGAL_SASL_STATE; break;
}
Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e, "frame too small to peek api_key, closing");
break;
}
}
}
if let Some(outcome) = try_handle_sasl_frame(&broker, &frame, &mut auth, &sasl_mechanisms)
.instrument(req_span.clone())
.await
{
let SaslFrameOutcome {
response_bytes,
close_after,
} = match outcome {
Ok(o) => o,
Err(e) => {
tracing::warn!(error = %e, "SASL dispatch error, closing connection");
break;
}
};
if let Err(e) = framed.send(response_bytes).await {
tracing::warn!(error = %e, "framed.send error during SASL, closing");
break;
}
if close_after {
tracing::info!("closing connection after failed SaslAuthenticate");
break;
}
continue;
}
macro_rules! intercept {
($call:expr, $label:literal) => {{
match $call.instrument(req_span.clone()).await {
Ok(bytes) => {
if let Err(e) = framed.send(bytes).await {
tracing::warn!(error = %e, concat!("framed.send error during ", $label, ", closing"));
break;
}
continue;
}
Err(e) => {
tracing::warn!(error = %e, concat!($label, " dispatch error, closing connection"));
break;
}
}
}};
}
match peek_api_key(&frame).ok() {
Some(34) => intercept!(
handle_alter_replica_log_dirs_frame(&broker, &frame, &auth, &peer),
"ARLD"
),
Some(51) => intercept!(
handle_alter_user_scram_credentials_frame(&broker, &frame, &auth, &peer),
"AUSCR"
),
Some(57) => intercept!(
handle_update_features_frame(&broker, &frame, &auth, &peer),
"UpdateFeatures"
),
Some(0) => intercept!(
handle_produce_frame(&broker, &frame, &auth, &peer),
"Produce"
),
Some(1) => intercept!(handle_fetch_frame(&broker, &frame, &auth, &peer), "Fetch"),
Some(3) => intercept!(
handle_metadata_frame(&broker, &frame, &auth, &peer),
"Metadata"
),
Some(19) => intercept!(
handle_create_topics_frame(&broker, &frame, &auth, &peer),
"CreateTopics"
),
Some(20) => intercept!(
handle_delete_topics_frame(&broker, &frame, &auth, &peer),
"DeleteTopics"
),
Some(33) => intercept!(
handle_alter_configs_frame(&broker, &frame, &auth, &peer),
"AlterConfigs"
),
Some(44) => intercept!(
handle_incremental_alter_configs_frame(&broker, &frame, &auth, &peer),
"IncrementalAlterConfigs"
),
Some(21) => intercept!(
handle_delete_records_frame(&broker, &frame, &auth, &peer),
"DeleteRecords"
),
Some(37) => intercept!(
handle_create_partitions_frame(&broker, &frame, &auth, &peer),
"CreatePartitions"
),
Some(15) => intercept!(
handle_describe_groups_frame(&broker, &frame, &auth, &peer),
"DescribeGroups"
),
Some(16) => intercept!(
handle_list_groups_frame(&broker, &frame, &auth, &peer),
"ListGroups"
),
Some(77) => intercept!(
handle_share_group_describe_frame(&broker, &frame, &auth, &peer),
"ShareGroupDescribe"
),
Some(78) => intercept!(
handle_share_fetch_frame(&broker, &frame, &auth, &peer),
"ShareFetch"
),
Some(79) => intercept!(
handle_share_acknowledge_frame(&broker, &frame, &auth, &peer),
"ShareAcknowledge"
),
Some(90) => intercept!(
handle_describe_share_group_offsets_frame(&broker, &frame, &auth, &peer),
"DescribeShareGroupOffsets"
),
Some(91) => intercept!(
handle_alter_share_group_offsets_frame(&broker, &frame, &auth, &peer),
"AlterShareGroupOffsets"
),
Some(92) => intercept!(
handle_delete_share_group_offsets_frame(&broker, &frame, &auth, &peer),
"DeleteShareGroupOffsets"
),
Some(42) => intercept!(
handle_delete_groups_frame(&broker, &frame, &auth, &peer),
"DeleteGroups"
),
Some(11) => intercept!(
handle_join_group_frame(&broker, &frame, &auth, &peer),
"JoinGroup"
),
Some(8) => intercept!(
handle_offset_commit_frame(&broker, &frame, &auth, &peer),
"OffsetCommit"
),
Some(9) => intercept!(
handle_offset_fetch_frame(&broker, &frame, &auth, &peer),
"OffsetFetch"
),
Some(47) => intercept!(
handle_offset_delete_frame(&broker, &frame, &auth, &peer),
"OffsetDelete"
),
Some(60) => intercept!(
handle_describe_cluster_frame(&broker, &frame, &auth, &peer),
"DescribeCluster"
),
Some(61) => intercept!(
handle_describe_producers_frame(&broker, &frame, &auth, &peer),
"DescribeProducers"
),
Some(65) => intercept!(
handle_describe_transactions_frame(&broker, &frame, &auth, &peer),
"DescribeTransactions"
),
Some(66) => intercept!(
handle_list_transactions_frame(&broker, &frame, &auth, &peer),
"ListTransactions"
),
Some(64) => intercept!(
handle_unregister_broker_frame(&broker, &frame, &auth, &peer),
"UnregisterBroker"
),
Some(75) => intercept!(
handle_describe_topic_partitions_frame(&broker, &frame, &auth, &peer),
"DescribeTopicPartitions"
),
Some(74) => intercept!(
handle_list_config_resources_frame(&broker, &frame, &auth, &peer),
"ListConfigResources"
),
Some(55) => intercept!(
handle_describe_quorum_frame(&broker, &frame, &auth, &peer),
"DescribeQuorum"
),
Some(80) => intercept!(
handle_add_raft_voter_frame(&broker, &frame, &auth, &peer),
"AddRaftVoter"
),
Some(81) => intercept!(
handle_remove_raft_voter_frame(&broker, &frame, &auth, &peer),
"RemoveRaftVoter"
),
Some(82) => intercept!(
handle_update_raft_voter_frame(&broker, &frame, &auth, &peer),
"UpdateRaftVoter"
),
Some(56) => intercept!(
handle_alter_partition_frame(&broker, &frame, &auth, &peer),
"AlterPartition"
),
Some(63) => intercept!(
handle_broker_heartbeat_frame(&broker, &frame, &auth, &peer),
"BrokerHeartbeat"
),
Some(93) => intercept!(
handle_get_replica_log_info_frame(&broker, &frame, &auth, &peer),
"GetReplicaLogInfo"
),
Some(12) => intercept!(
handle_heartbeat_frame(&broker, &frame, &auth, &peer),
"Heartbeat"
),
Some(14) => intercept!(
handle_sync_group_frame(&broker, &frame, &auth, &peer),
"SyncGroup"
),
Some(13) => intercept!(
handle_leave_group_frame(&broker, &frame, &auth, &peer),
"LeaveGroup"
),
Some(68) => intercept!(
handle_consumer_group_heartbeat_frame(&broker, &frame, &auth, &peer),
"ConsumerGroupHeartbeat"
),
Some(76) => intercept!(
handle_share_group_heartbeat_frame(&broker, &frame, &auth, &peer),
"ShareGroupHeartbeat"
),
Some(88) => intercept!(
handle_streams_group_heartbeat_frame(&broker, &frame, &auth, &peer),
"StreamsGroupHeartbeat"
),
Some(10) => intercept!(
handle_find_coordinator_frame(&broker, &frame, &auth, &peer),
"FindCoordinator"
),
Some(2) => intercept!(
handle_list_offsets_frame(&broker, &frame, &auth, &peer),
"ListOffsets"
),
Some(23) => intercept!(
handle_offset_for_leader_epoch_frame(&broker, &frame, &auth, &peer),
"OffsetForLeaderEpoch"
),
Some(32) => intercept!(
handle_describe_configs_frame(&broker, &frame, &auth, &peer),
"DescribeConfigs"
),
Some(35) => intercept!(
handle_describe_log_dirs_frame(&broker, &frame, &auth, &peer),
"DescribeLogDirs"
),
Some(29) => intercept!(
handle_describe_acls_frame(&broker, &frame, &auth, &peer),
"DescribeAcls"
),
Some(30) => intercept!(
handle_create_acls_frame(&broker, &frame, &auth, &peer),
"CreateAcls"
),
Some(31) => intercept!(
handle_delete_acls_frame(&broker, &frame, &auth, &peer),
"DeleteAcls"
),
Some(43) => intercept!(
handle_elect_leaders_frame(&broker, &frame, &auth, &peer),
"ElectLeaders"
),
Some(45) => intercept!(
handle_alter_partition_reassignments_frame(&broker, &frame, &auth, &peer),
"AlterPartitionReassignments"
),
Some(46) => intercept!(
handle_list_partition_reassignments_frame(&broker, &frame, &auth, &peer),
"ListPartitionReassignments"
),
Some(48) => intercept!(
handle_describe_client_quotas_frame(&broker, &frame, &auth, &peer),
"DescribeClientQuotas"
),
Some(49) => intercept!(
handle_alter_client_quotas_frame(&broker, &frame, &auth, &peer),
"AlterClientQuotas"
),
Some(50) => intercept!(
handle_describe_user_scram_credentials_frame(&broker, &frame, &auth, &peer),
"DescribeUserScramCredentials"
),
Some(38) => intercept!(
handle_create_delegation_token_frame(&broker, &frame, &auth),
"CreateDelegationToken"
),
Some(39) => intercept!(
handle_renew_delegation_token_frame(&broker, &frame, &auth),
"RenewDelegationToken"
),
Some(40) => intercept!(
handle_expire_delegation_token_frame(&broker, &frame, &auth),
"ExpireDelegationToken"
),
Some(41) => intercept!(
handle_describe_delegation_token_frame(&broker, &frame, &auth, &peer),
"DescribeDelegationToken"
),
Some(22) => intercept!(
handle_init_producer_id_frame(&broker, &frame, &auth, &peer),
"InitProducerId"
),
Some(24) => intercept!(
handle_add_partitions_to_txn_frame(&broker, &frame, &auth, &peer),
"AddPartitionsToTxn"
),
Some(26) => intercept!(
handle_end_txn_frame(&broker, &frame, &auth, &peer),
"EndTxn"
),
Some(28) => intercept!(
handle_txn_offset_commit_frame(&broker, &frame, &auth, &peer),
"TxnOffsetCommit"
),
Some(71) => intercept!(
handle_get_telemetry_subscriptions_frame(
&broker,
&frame,
&peer,
&client_software_name,
&client_software_version,
),
"GetTelemetrySubscriptions"
),
Some(72) => intercept!(
handle_push_telemetry_frame(
&broker,
&frame,
&peer,
&client_software_name,
&client_software_version,
),
"PushTelemetry"
),
_ => {}
}
if peek_api_key(&frame).ok() == Some(API_VERSIONS_KEY)
&& let Ok((_, api_version, _, body)) = parse_request_header(&frame)
&& api_version >= 3
{
use crabka_protocol::Decode;
let mut cur: &[u8] = body;
if let Ok(req) =
crabka_protocol::owned::api_versions_request::ApiVersionsRequest::decode(
&mut cur,
api_version,
)
&& crate::handlers::api_versions::is_valid_client_info(&req.client_software_name)
&& crate::handlers::api_versions::is_valid_client_info(&req.client_software_version)
{
client_software_name.clone_from(&req.client_software_name);
client_software_version.clone_from(&req.client_software_version);
}
}
let started = std::time::Instant::now();
let api_key = peek_api_key(&frame).ok();
let response_bytes = match dispatch_one(&broker, &frame)
.instrument(req_span.clone())
.await
{
Ok(b) => b,
Err(e) => {
tracing::warn!(error = %e, "dispatch error, closing connection");
break;
}
};
#[allow(clippy::cast_possible_truncation)]
let elapsed_micros = started.elapsed().as_micros().min(u128::from(u64::MAX)) as u64;
let self_accounts = matches!(api_key, Some(0 | 1));
if !self_accounts && let Some(principal) = auth.principal() {
let client_id_str = peek_client_id(&frame).unwrap_or("");
let image = broker.controller.current_image();
let delay = crate::quota::consume_request_quota(
&image,
&broker.quota_buckets,
&principal.name,
client_id_str,
elapsed_micros,
);
if delay > std::time::Duration::ZERO {
tokio::time::sleep(delay).await;
}
}
if let Err(e) = framed.send(response_bytes).await {
tracing::warn!(error = %e, "framed.send error, closing");
break;
}
}
tracing::info!("connection closed");
}
struct SaslFrameOutcome {
response_bytes: Bytes,
close_after: bool,
}
async fn try_handle_sasl_frame(
broker: &Broker,
frame: &[u8],
auth: &mut crate::network::auth::ConnectionAuth,
sasl_mechanisms: &[crabka_security::SaslMechanism],
) -> Option<Result<SaslFrameOutcome, BrokerError>> {
let api_key = peek_api_key(frame).ok()?;
if api_key != 17 && api_key != 36 {
return None;
}
Some(handle_sasl_frame(broker, frame, auth, api_key, sasl_mechanisms).await)
}
async fn handle_sasl_frame(
broker: &Broker,
frame: &[u8],
auth: &mut crate::network::auth::ConnectionAuth,
api_key: i16,
sasl_mechanisms: &[crabka_security::SaslMechanism],
) -> Result<SaslFrameOutcome, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (parsed_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(parsed_key, api_key);
let body_flexible = handler_body_flexible(api_key, api_version);
let (resp_body, close_after) = match api_key {
17 => {
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::sasl_handshake_request::SaslHandshakeRequest::decode(
&mut cur,
api_version,
)?;
let resp = crate::network::auth::handle_handshake(&req, auth, sasl_mechanisms);
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
(buf.freeze(), false)
}
36 => {
let mut cur: &[u8] = body;
let req =
crabka_protocol::owned::sasl_authenticate_request::SaslAuthenticateRequest::decode(
&mut cur,
api_version,
)?;
let mech_opt = match auth {
crate::network::auth::ConnectionAuth::Negotiating { mechanism, .. } => {
Some(*mechanism)
}
crate::network::auth::ConnectionAuth::Reauthenticating { previous, .. } => {
Some(previous.mechanism)
}
_ => None,
};
let resp = if let Some(mech) = mech_opt {
match mech {
crabka_security::SaslMechanism::Plain => {
crate::network::auth::handle_authenticate_plain(
&req,
auth,
&broker.config.plain_credentials,
)
}
crabka_security::SaslMechanism::ScramSha256
| crabka_security::SaslMechanism::ScramSha512 => {
crate::network::auth::handle_authenticate_scram(
&req,
auth,
&*broker.controller,
)
}
crabka_security::SaslMechanism::OAuthBearer => {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
crate::network::auth::handle_authenticate_oauthbearer(
&req,
auth,
&broker.config.oauthbearer_validator,
now_ms,
broker.config.oauthbearer_max_session_lifetime_seconds,
)
.await
}
crabka_security::SaslMechanism::Gssapi => {
let cfg = broker
.config
.gssapi
.as_ref()
.expect("GSSAPI enabled without config");
crate::network::auth::handle_authenticate_gssapi(&req, auth, cfg)
}
}
} else {
crabka_protocol::owned::sasl_authenticate_response::SaslAuthenticateResponse {
error_code: codes::ILLEGAL_SASL_STATE,
error_message: Some("SaslAuthenticate without prior SaslHandshake".into()),
auth_bytes: Bytes::new(),
session_lifetime_ms: 0,
..Default::default()
}
};
let mech_label = mech_opt.map_or("Unknown", crabka_security::SaslMechanism::wire_name);
broker
.metrics
.record_authentication(mech_label, resp.error_code == 0);
let close = resp.error_code != 0;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
(buf.freeze(), close)
}
_ => unreachable!("filtered by caller to 17 / 36 only"),
};
let response_bytes = encode_response(api_key, correlation_id, body_flexible, &resp_body);
Ok(SaslFrameOutcome {
response_bytes,
close_after,
})
}
async fn handle_alter_replica_log_dirs_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use std::collections::BTreeMap;
use crabka_protocol::owned::alter_replica_log_dirs_request::AlterReplicaLogDirsRequest;
use crabka_protocol::owned::alter_replica_log_dirs_response::{
AlterReplicaLogDirPartitionResult, AlterReplicaLogDirTopicResult,
AlterReplicaLogDirsResponse,
};
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 34);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = auth
.principal()
.cloned()
.unwrap_or_else(|| crabka_security::Principal {
name: "ANONYMOUS".to_string(),
auth_method: crabka_security::AuthMethod::Anonymous,
groups: vec![],
});
let image = broker.controller.current_image();
let authorized = broker.config.authorizer.authorize(
&*image,
&crate::authorizer::AuthorizationRequest {
principal: &principal,
host: peer,
resource_type: crabka_metadata::ResourceType::Cluster,
resource_name: "kafka-cluster",
operation: crabka_metadata::AclOperation::Alter,
},
) == crate::authorizer::AuthorizationResult::Allow;
if !authorized {
let mut cur: &[u8] = body;
let req = AlterReplicaLogDirsRequest::decode(&mut cur, api_version)?;
let mut by_topic: BTreeMap<String, Vec<AlterReplicaLogDirPartitionResult>> =
BTreeMap::new();
for dir in req.dirs {
for topic in dir.topics {
for partition_index in topic.partitions {
by_topic.entry(topic.name.clone()).or_default().push(
AlterReplicaLogDirPartitionResult {
partition_index,
error_code: codes::CLUSTER_AUTHORIZATION_FAILED,
..Default::default()
},
);
}
}
}
let results: Vec<_> = by_topic
.into_iter()
.map(|(name, partitions)| AlterReplicaLogDirTopicResult {
topic_name: name,
partitions,
..Default::default()
})
.collect();
let resp = AlterReplicaLogDirsResponse {
throttle_time_ms: 0,
results,
..Default::default()
};
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
return Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&buf.freeze(),
));
}
let resp_body =
crate::handlers::alter_replica_log_dirs::handle(broker, api_version, correlation_id, body)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_alter_user_scram_credentials_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 51);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req =
crabka_protocol::owned::alter_user_scram_credentials_request::AlterUserScramCredentialsRequest::decode(
&mut cur, api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp = crate::handlers::alter_user_scram_credentials::handle(broker, req, &ctx).await;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
let resp_body = buf.freeze();
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_update_features_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 57);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::update_features_request::UpdateFeaturesRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp = crate::handlers::update_features::handle(broker, req, api_version, &ctx).await;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
let resp_body = buf.freeze();
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_cluster_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 60);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_cluster::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_producers_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 61);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::describe_producers::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_transactions_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 65);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::describe_transactions::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_list_transactions_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 66);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::list_transactions::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_unregister_broker_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 64);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::unregister_broker::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_add_raft_voter_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 80);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::add_raft_voter::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_remove_raft_voter_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 81);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::remove_raft_voter::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_update_raft_voter_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 82);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::update_raft_voter::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_alter_partition_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 56);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::alter_partition::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_broker_heartbeat_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 63);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::broker_heartbeat::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_get_replica_log_info_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 93);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::get_replica_log_info::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_heartbeat_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 12);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::heartbeat::handle(broker, api_version, correlation_id, body, &ctx).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_sync_group_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 14);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::sync_group::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_leave_group_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 13);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::leave_group::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_consumer_group_heartbeat_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 68);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::consumer_group_heartbeat::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_share_group_heartbeat_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 76);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::share_group_heartbeat::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_streams_group_heartbeat_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 88);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::streams_group_heartbeat::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_find_coordinator_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 10);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::find_coordinator::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_list_offsets_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 2);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::list_offsets::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_offset_for_leader_epoch_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 23);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::offset_for_leader_epoch::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_configs_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 32);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_configs::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_log_dirs_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 35);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_log_dirs::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_topic_partitions_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 75);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::describe_topic_partitions::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_list_config_resources_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 74);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::list_config_resources::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_quorum_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 55);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_quorum::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_produce_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 0);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::produce::handle(broker, api_version, correlation_id, body, &ctx).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_fetch_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 1);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::fetch::handle(broker, api_version, correlation_id, body, &ctx).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_metadata_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 3);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::metadata::handle(broker, api_version, correlation_id, body, &ctx).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_create_topics_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 19);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::create_topics::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_delete_topics_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 20);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::delete_topics::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_acls_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 29);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::describe_acls_request::DescribeAclsRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::describe_acls::handle(broker, req, &ctx, api_version).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_create_acls_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 30);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::create_acls_request::CreateAclsRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::create_acls::handle(broker, req, &ctx, api_version).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_delete_acls_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 31);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::delete_acls_request::DeleteAclsRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::delete_acls::handle(broker, req, &ctx, api_version).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_elect_leaders_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 43);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::elect_leaders_request::ElectLeadersRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::elect_leaders::handle(broker, req, &ctx, api_version).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_alter_partition_reassignments_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 45);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::alter_partition_reassignments_request::AlterPartitionReassignmentsRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::alter_partition_reassignments::handle(broker, req, &ctx, api_version)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_list_partition_reassignments_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 46);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::list_partition_reassignments_request::ListPartitionReassignmentsRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::list_partition_reassignments::handle(broker, req, &ctx, api_version)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_client_quotas_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 48);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::describe_client_quotas_request::DescribeClientQuotasRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_client_quotas::handle(broker, req, &ctx, api_version).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_alter_client_quotas_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 49);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req =
crabka_protocol::owned::alter_client_quotas_request::AlterClientQuotasRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::alter_client_quotas::handle(broker, req, &ctx, api_version).await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_user_scram_credentials_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::Decode;
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 50);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::describe_user_scram_credentials_request::DescribeUserScramCredentialsRequest::decode(
&mut cur,
api_version,
)?;
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_user_scram_credentials::handle(broker, req, &ctx, api_version)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_create_delegation_token_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 38);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::create_delegation_token_request::CreateDelegationTokenRequest::decode(
&mut cur, api_version,
)?;
let resp = crate::handlers::create_delegation_token::handle(
&req,
auth,
broker.config.delegation_token_secret_key.as_ref(),
broker.config.delegation_token_max_lifetime_ms,
broker.config.delegation_token_default_renew_period_ms,
&*broker.controller,
&broker.config.super_users,
)
.await;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
let resp_body = buf.freeze();
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_renew_delegation_token_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 39);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::renew_delegation_token_request::RenewDelegationTokenRequest::decode(
&mut cur, api_version,
)?;
let resp = crate::handlers::renew_delegation_token::handle(
&req,
auth,
broker.config.delegation_token_secret_key.as_ref(),
broker.config.delegation_token_default_renew_period_ms,
&*broker.controller,
&broker.config.super_users,
)
.await;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
let resp_body = buf.freeze();
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_expire_delegation_token_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 40);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::expire_delegation_token_request::ExpireDelegationTokenRequest::decode(
&mut cur, api_version,
)?;
let resp = crate::handlers::expire_delegation_token::handle(
&req,
auth,
broker.config.delegation_token_secret_key.as_ref(),
&*broker.controller,
&broker.config.super_users,
)
.await;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
let resp_body = buf.freeze();
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_delegation_token_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
use crabka_protocol::{Decode, Encode};
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 41);
let body_flexible = handler_body_flexible(api_key, api_version);
let mut cur: &[u8] = body;
let req = crabka_protocol::owned::describe_delegation_token_request::DescribeDelegationTokenRequest::decode(
&mut cur, api_version,
)?;
let resp = crate::handlers::describe_delegation_token::handle(
&req,
auth,
broker.config.delegation_token_secret_key.as_ref(),
&*broker.controller,
peer,
broker.config.authorizer.as_ref(),
)
.await;
let mut buf = BytesMut::with_capacity(resp.encoded_len(api_version));
resp.encode(&mut buf, api_version)?;
let resp_body = buf.freeze();
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_alter_configs_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 33);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::alter_configs::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_incremental_alter_configs_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 44);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::incremental_alter_configs::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_delete_records_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 21);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::delete_records::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_create_partitions_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 37);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::create_partitions::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_groups_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 15);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::describe_groups::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_share_group_describe_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 77);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::share_group_describe::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_describe_share_group_offsets_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 90);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::describe_share_group_offsets::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_alter_share_group_offsets_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 91);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::alter_share_group_offsets::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_delete_share_group_offsets_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 92);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::handlers::delete_share_group_offsets::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_share_fetch_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 78);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::share_fetch::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_share_acknowledge_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 79);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::share_acknowledge::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_list_groups_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 16);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::list_groups::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_delete_groups_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 42);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::delete_groups::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_join_group_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 11);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::join_group::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_offset_commit_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 8);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::offset_commit::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_offset_fetch_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 9);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::offset_fetch::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_offset_delete_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 47);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::offset_delete::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_init_producer_id_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 22);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::handlers::init_producer_id::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_add_partitions_to_txn_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 24);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::txn::handlers::add_partitions_to_txn::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_end_txn_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 26);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body =
crate::txn::handlers::end_txn::handle(broker, api_version, correlation_id, body, &ctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_txn_offset_commit_frame(
broker: &Broker,
frame: &[u8],
auth: &crate::network::auth::ConnectionAuth,
peer: &SocketAddr,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 28);
let body_flexible = handler_body_flexible(api_key, api_version);
let principal = principal_or_anonymous(auth);
let client_id = peek_client_id(frame).unwrap_or("");
let ctx = crate::handlers::RequestContext {
principal,
peer,
client_id,
};
let resp_body = crate::txn::handlers::txn_offset_commit::handle(
broker,
api_version,
correlation_id,
body,
&ctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_get_telemetry_subscriptions_frame(
broker: &Broker,
frame: &[u8],
peer: &SocketAddr,
software_name: &str,
software_version: &str,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 71);
let body_flexible = handler_body_flexible(api_key, api_version);
let client_id = peek_client_id(frame).unwrap_or("");
let tctx = crate::handlers::TelemetryContext {
client_id,
peer,
software_name,
software_version,
};
let resp_body = crate::handlers::get_telemetry_subscriptions::handle(
broker,
api_version,
correlation_id,
body,
&tctx,
)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
async fn handle_push_telemetry_frame(
broker: &Broker,
frame: &[u8],
peer: &SocketAddr,
software_name: &str,
software_version: &str,
) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
debug_assert_eq!(api_key, 72);
let body_flexible = handler_body_flexible(api_key, api_version);
let client_id = peek_client_id(frame).unwrap_or("");
let tctx = crate::handlers::TelemetryContext {
client_id,
peer,
software_name,
software_version,
};
let resp_body =
crate::handlers::push_telemetry::handle(broker, api_version, correlation_id, body, &tctx)
.await?;
Ok(encode_response(
api_key,
correlation_id,
body_flexible,
&resp_body,
))
}
fn peek_api_key(frame: &[u8]) -> Result<i16, BrokerError> {
if frame.len() < 2 {
return Err(BrokerError::Protocol(
crabka_protocol::ProtocolError::InvalidValue(
"request frame: too short to peek api_key",
),
));
}
Ok(i16::from_be_bytes([frame[0], frame[1]]))
}
fn peek_client_id(frame: &[u8]) -> Option<&str> {
if frame.len() < 10 {
return None;
}
let cid_len = i16::from_be_bytes([frame[8], frame[9]]);
if cid_len <= 0 {
return None;
}
#[allow(clippy::cast_sign_loss)]
let n = cid_len as usize;
let start = 10usize;
let end = start.checked_add(n)?;
if frame.len() < end {
return None;
}
std::str::from_utf8(&frame[start..end]).ok()
}
async fn dispatch_one(broker: &Broker, frame: &[u8]) -> Result<Bytes, BrokerError> {
let (api_key, api_version, correlation_id, body) = parse_request_header(frame)?;
let body_flexible = handler_body_flexible(api_key, api_version);
broker.metrics.record_api_request(api_key);
tracing::info!(
api_key,
api_version,
correlation_id,
body_flexible,
body_len = body.len(),
"dispatching request"
);
let handler = broker
.handlers()
.get(api_key)
.ok_or(BrokerError::UnsupportedApi {
api_key,
version: api_version,
});
let resp_body: Bytes = if let Ok(h) = handler {
h(broker, api_version, correlation_id, body).await?
} else {
tracing::warn!(api_key, api_version, "unsupported api, returning error");
broker.metrics.record_unsupported_api_request(api_key);
let mut buf = BytesMut::with_capacity(2);
buf.put_i16(codes::UNSUPPORTED_VERSION);
buf.freeze()
};
let out = encode_response(api_key, correlation_id, body_flexible, &resp_body);
tracing::info!(
api_key,
api_version,
correlation_id,
resp_len = out.len(),
"response built"
);
Ok(out)
}
fn parse_request_header(frame: &[u8]) -> Result<(i16, i16, i32, &[u8]), BrokerError> {
if frame.len() < 8 {
return Err(BrokerError::Protocol(
crabka_protocol::ProtocolError::InvalidValue("request frame < 8 bytes"),
));
}
let mut cur = frame;
let api_key = cur.get_i16();
let api_version = cur.get_i16();
let correlation_id = cur.get_i32();
let body_flexible = handler_body_flexible(api_key, api_version);
let header_v2 = body_flexible;
if cur.remaining() < 2 {
return Err(BrokerError::Protocol(
crabka_protocol::ProtocolError::InvalidValue("request frame: missing client_id length"),
));
}
let cid_len = cur.get_i16();
if cid_len > 0 {
let n = usize::try_from(cid_len).expect("non-negative i16 fits usize");
if cur.remaining() < n {
return Err(BrokerError::Protocol(
crabka_protocol::ProtocolError::InvalidValue(
"request frame: client_id length > available",
),
));
}
cur.advance(n);
}
if header_v2 {
if cur.remaining() < 1 {
return Err(BrokerError::Protocol(
crabka_protocol::ProtocolError::InvalidValue(
"request frame: missing header tagged-fields byte",
),
));
}
let tagged = cur.get_u8();
if tagged != 0 {
tracing::debug!(
api_key,
api_version,
"non-empty header tagged fields ignored"
);
}
}
Ok((api_key, api_version, correlation_id, cur))
}
fn handler_body_flexible(api_key: i16, version: i16) -> bool {
use crabka_protocol::owned;
match api_key {
0 => version >= owned::produce_request::FLEXIBLE_MIN,
1 => version >= owned::fetch_request::FLEXIBLE_MIN,
2 => version >= owned::list_offsets_request::FLEXIBLE_MIN,
3 => version >= owned::metadata_request::FLEXIBLE_MIN,
8 => version >= owned::offset_commit_request::FLEXIBLE_MIN,
9 => version >= owned::offset_fetch_request::FLEXIBLE_MIN,
10 => version >= owned::find_coordinator_request::FLEXIBLE_MIN,
11 => version >= owned::join_group_request::FLEXIBLE_MIN,
12 => version >= owned::heartbeat_request::FLEXIBLE_MIN,
13 => version >= owned::leave_group_request::FLEXIBLE_MIN,
14 => version >= owned::sync_group_request::FLEXIBLE_MIN,
15 => version >= owned::describe_groups_request::FLEXIBLE_MIN,
16 => version >= owned::list_groups_request::FLEXIBLE_MIN,
18 => version >= owned::api_versions_request::FLEXIBLE_MIN,
19 => version >= owned::create_topics_request::FLEXIBLE_MIN,
20 => version >= owned::delete_topics_request::FLEXIBLE_MIN,
21 => version >= owned::delete_records_request::FLEXIBLE_MIN,
22 => version >= owned::init_producer_id_request::FLEXIBLE_MIN,
23 => version >= owned::offset_for_leader_epoch_request::FLEXIBLE_MIN,
24 => version >= owned::add_partitions_to_txn_request::FLEXIBLE_MIN,
25 => version >= owned::add_offsets_to_txn_request::FLEXIBLE_MIN,
26 => version >= owned::end_txn_request::FLEXIBLE_MIN,
27 => version >= owned::write_txn_markers_request::FLEXIBLE_MIN,
28 => version >= owned::txn_offset_commit_request::FLEXIBLE_MIN,
29 => version >= owned::describe_acls_request::FLEXIBLE_MIN,
30 => version >= owned::create_acls_request::FLEXIBLE_MIN,
31 => version >= owned::delete_acls_request::FLEXIBLE_MIN,
32 => version >= owned::describe_configs_request::FLEXIBLE_MIN,
33 => version >= owned::alter_configs_request::FLEXIBLE_MIN,
34 => version >= owned::alter_replica_log_dirs_request::FLEXIBLE_MIN,
35 => version >= owned::describe_log_dirs_request::FLEXIBLE_MIN,
36 => version >= owned::sasl_authenticate_request::FLEXIBLE_MIN,
37 => version >= owned::create_partitions_request::FLEXIBLE_MIN,
38 => version >= owned::create_delegation_token_request::FLEXIBLE_MIN,
39 => version >= owned::renew_delegation_token_request::FLEXIBLE_MIN,
40 => version >= owned::expire_delegation_token_request::FLEXIBLE_MIN,
41 => version >= owned::describe_delegation_token_request::FLEXIBLE_MIN,
42 => version >= owned::delete_groups_request::FLEXIBLE_MIN,
43 => version >= owned::elect_leaders_request::FLEXIBLE_MIN,
44 => version >= owned::incremental_alter_configs_request::FLEXIBLE_MIN,
45 => version >= owned::alter_partition_reassignments_request::FLEXIBLE_MIN,
46 => version >= owned::list_partition_reassignments_request::FLEXIBLE_MIN,
48 => version >= owned::describe_client_quotas_request::FLEXIBLE_MIN,
49 => version >= owned::alter_client_quotas_request::FLEXIBLE_MIN,
50 => version >= owned::describe_user_scram_credentials_request::FLEXIBLE_MIN,
51 => version >= owned::alter_user_scram_credentials_request::FLEXIBLE_MIN,
55 => version >= owned::describe_quorum_request::FLEXIBLE_MIN,
56 => version >= owned::alter_partition_request::FLEXIBLE_MIN,
57 => version >= owned::update_features_request::FLEXIBLE_MIN,
59 => version >= owned::fetch_snapshot_request::FLEXIBLE_MIN,
60 => version >= owned::describe_cluster_request::FLEXIBLE_MIN,
61 => version >= owned::describe_producers_request::FLEXIBLE_MIN,
63 => version >= owned::broker_heartbeat_request::FLEXIBLE_MIN,
64 => version >= owned::unregister_broker_request::FLEXIBLE_MIN,
65 => version >= owned::describe_transactions_request::FLEXIBLE_MIN,
66 => version >= owned::list_transactions_request::FLEXIBLE_MIN,
68 => version >= owned::consumer_group_heartbeat_request::FLEXIBLE_MIN,
69 => version >= owned::consumer_group_describe_request::FLEXIBLE_MIN,
71 => version >= owned::get_telemetry_subscriptions_request::FLEXIBLE_MIN,
72 => version >= owned::push_telemetry_request::FLEXIBLE_MIN,
76 => version >= owned::share_group_heartbeat_request::FLEXIBLE_MIN,
77 => version >= owned::share_group_describe_request::FLEXIBLE_MIN,
88 => version >= owned::streams_group_heartbeat_request::FLEXIBLE_MIN,
89 => version >= owned::streams_group_describe_request::FLEXIBLE_MIN,
78 => version >= owned::share_fetch_request::FLEXIBLE_MIN,
79 => version >= owned::share_acknowledge_request::FLEXIBLE_MIN,
90 => version >= owned::describe_share_group_offsets_request::FLEXIBLE_MIN,
91 => version >= owned::alter_share_group_offsets_request::FLEXIBLE_MIN,
92 => version >= owned::delete_share_group_offsets_request::FLEXIBLE_MIN,
83 => version >= owned::initialize_share_group_state_request::FLEXIBLE_MIN,
84 => version >= owned::read_share_group_state_request::FLEXIBLE_MIN,
85 => version >= owned::write_share_group_state_request::FLEXIBLE_MIN,
86 => version >= owned::delete_share_group_state_request::FLEXIBLE_MIN,
87 => version >= owned::read_share_group_state_summary_request::FLEXIBLE_MIN,
74 => version >= owned::list_config_resources_request::FLEXIBLE_MIN,
75 => version >= owned::describe_topic_partitions_request::FLEXIBLE_MIN,
80 => version >= owned::add_raft_voter_request::FLEXIBLE_MIN,
81 => version >= owned::remove_raft_voter_request::FLEXIBLE_MIN,
82 => version >= owned::update_raft_voter_request::FLEXIBLE_MIN,
93 => version >= owned::get_replica_log_info_request::FLEXIBLE_MIN,
73 => version >= owned::assign_replicas_to_dirs_request::FLEXIBLE_MIN,
_ => false,
}
}
fn encode_response(api_key: i16, correlation_id: i32, body_flexible: bool, body: &[u8]) -> Bytes {
let header_v1 = body_flexible && api_key != API_VERSIONS_KEY;
let header_len = if header_v1 { 5 } else { 4 };
debug_assert!(body.len() < MAX_FRAME_BYTES);
let mut buf = BytesMut::with_capacity(header_len + body.len());
buf.put_i32(correlation_id);
if header_v1 {
buf.put_u8(0); }
buf.put_slice(body);
buf.freeze()
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn parse_header_v1_no_flexible() {
let mut buf = BytesMut::new();
buf.put_i16(3);
buf.put_i16(8);
buf.put_i32(42);
buf.put_i16(2);
buf.put_slice(b"hi");
let (k, v, c, body) = parse_request_header(&buf).unwrap();
assert!((k, v, c, body.len()) == (3, 8, 42, 0));
}
#[test]
fn parse_header_v2_with_tagged_byte() {
let mut buf = BytesMut::new();
buf.put_i16(18);
buf.put_i16(3);
buf.put_i32(1);
buf.put_i16(1);
buf.put_slice(b"x");
buf.put_u8(0); let (k, v, c, body) = parse_request_header(&buf).unwrap();
assert!((k, v, c, body.len()) == (18, 3, 1, 0));
}
#[test]
fn encode_response_apiversions_uses_v0_header() {
let body = [0u8, 0u8]; let out = encode_response(API_VERSIONS_KEY, 7, true, &body);
assert!(out.len() == 4 + body.len());
}
#[test]
fn peek_api_key_reads_first_two_bytes_big_endian() {
let mut buf = BytesMut::new();
buf.put_i16(18);
buf.put_i16(3);
buf.put_i32(1);
assert!(peek_api_key(&buf).unwrap() == 18);
}
#[test]
fn peek_api_key_rejects_short_frame() {
let buf = [0u8; 1];
assert!(peek_api_key(&buf).is_err());
}
#[test]
fn encode_response_other_flexible_inserts_tagged_byte() {
let body = [0u8, 0u8];
let out = encode_response(3, 7, true, &body);
assert!(out.len() == 5 + body.len());
assert!(out[4] == 0); }
#[test]
fn raft_voter_rpcs_peek_and_flex_routing() {
for api_key in [80i16, 81, 82] {
let mut buf = BytesMut::new();
buf.put_i16(api_key);
buf.put_i16(0); buf.put_i32(1); assert!(peek_api_key(&buf).unwrap() == api_key);
assert!(
handler_body_flexible(api_key, 0),
"api_key {api_key} is flexible from v0"
);
}
}
}