use std::collections::HashMap;
use crate::error::{Error, Result};
use crate::protocol::{
API_VERSION_FETCH, API_VERSION_FIND_COORDINATOR, API_VERSION_LIST_OFFSETS,
API_VERSION_METADATA, API_VERSION_OFFSET_COMMIT, API_VERSION_OFFSET_FETCH,
API_VERSION_PRODUCE,
};
use tracing::{debug, info};
use crate::client::network::KafkaConnection;
pub mod api_key {
pub const PRODUCE: i16 = 0;
pub const FETCH: i16 = 1;
pub const LIST_OFFSETS: i16 = 2;
pub const METADATA: i16 = 3;
pub const FIND_COORDINATOR: i16 = 10;
pub const OFFSET_COMMIT: i16 = 8;
pub const OFFSET_FETCH: i16 = 9;
pub const API_VERSIONS: i16 = 18;
}
const API_VERSIONS_REQUEST_VERSION: i16 = 3;
#[derive(Debug, Clone)]
pub struct BrokerApiVersions {
versions: HashMap<i16, (i16, i16)>, }
impl BrokerApiVersions {
fn from_response(
resp: kafka_protocol::messages::ApiVersionsResponse,
) -> BrokerApiVersions {
let mut versions = HashMap::new();
for av in resp.api_keys {
versions.insert(av.api_key, (av.min_version, av.max_version));
}
BrokerApiVersions { versions }
}
pub fn negotiate(&self, api_key: i16, fallback: i16) -> i16 {
match self.versions.get(&api_key) {
Some(&(min, max)) => {
if fallback < min {
debug!(
"API key {}: our version {} below broker min {}, using min",
api_key, fallback, min
);
min
} else if fallback > max {
debug!(
"API key {}: our version {} above broker max {}, using max",
api_key, fallback, max
);
max
} else {
fallback
}
}
None => {
debug!("API key {}: not supported by broker, using fallback {}", api_key, fallback);
fallback
}
}
}
}
pub fn fetch_api_versions(
conn: &mut KafkaConnection,
correlation_id: i32,
client_id: &str,
) -> Result<BrokerApiVersions> {
use bytes::BytesMut;
use kafka_protocol::messages::{ApiVersionsRequest, RequestHeader};
use kafka_protocol::protocol::{Decodable, Encodable};
use kafka_protocol::protocol::StrBytes;
let request = ApiVersionsRequest::default()
.with_client_software_name(StrBytes::from_static_str("rustfs-kafka"));
let header = RequestHeader::default()
.with_request_api_key(api_key::API_VERSIONS)
.with_request_api_version(API_VERSIONS_REQUEST_VERSION)
.with_correlation_id(correlation_id)
.with_client_id(Some(StrBytes::from_string(client_id.to_owned())));
let mut header_buf = BytesMut::new();
header.encode(&mut header_buf, API_VERSIONS_REQUEST_VERSION)
.map_err(|_| Error::CodecError)?;
let mut body_buf = BytesMut::new();
request.encode(&mut body_buf, API_VERSIONS_REQUEST_VERSION)
.map_err(|_| Error::CodecError)?;
let total_len = (header_buf.len() + body_buf.len()) as i32;
let mut out = BytesMut::with_capacity(4 + total_len as usize);
out.extend_from_slice(&total_len.to_be_bytes());
out.extend_from_slice(&header_buf);
out.extend_from_slice(&body_buf);
conn.send(&out)?;
let size = {
let mut buf = [0u8; 4];
conn.read_exact(&mut buf)?;
i32::from_be_bytes(buf)
};
let resp_bytes = conn.read_exact_alloc(size as u64)?;
let mut bytes = bytes::Bytes::from(resp_bytes);
use kafka_protocol::messages::ResponseHeader;
let _resp_header = ResponseHeader::decode(&mut bytes, API_VERSIONS_REQUEST_VERSION)
.map_err(|_| Error::CodecError)?;
let kp_resp = kafka_protocol::messages::ApiVersionsResponse::decode(
&mut bytes,
API_VERSIONS_REQUEST_VERSION,
)
.map_err(|_| Error::CodecError)?;
let result = BrokerApiVersions::from_response(kp_resp);
info!("Negotiated API versions: {:?}", result);
Ok(result)
}
#[derive(Debug, Default)]
pub struct ApiVersionCache {
broker_versions: HashMap<String, BrokerApiVersions>,
}
impl ApiVersionCache {
pub fn new() -> Self {
ApiVersionCache {
broker_versions: HashMap::new(),
}
}
pub fn contains(&self, host: &str) -> bool {
self.broker_versions.contains_key(host)
}
pub fn insert(&mut self, host: String, versions: BrokerApiVersions) {
self.broker_versions.insert(host, versions);
}
pub fn get_or_fetch(
&mut self,
host: &str,
conn: &mut KafkaConnection,
correlation_id: i32,
client_id: &str,
) -> Result<&BrokerApiVersions> {
if !self.broker_versions.contains_key(host) {
let versions = fetch_api_versions(conn, correlation_id, client_id)?;
self.broker_versions.insert(host.to_owned(), versions);
}
Ok(self.broker_versions.get(host).unwrap())
}
pub fn invalidate(&mut self, host: &str) {
self.broker_versions.remove(host);
}
pub fn negotiate(&self, host: &str, api_key: i16, fallback: i16) -> i16 {
self.broker_versions
.get(host)
.map(|v| v.negotiate(api_key, fallback))
.unwrap_or(fallback)
}
}
pub fn resolve_api_version(
cache: &ApiVersionCache,
host: &str,
api_key: i16,
default: i16,
) -> i16 {
cache.negotiate(host, api_key, default)
}
pub fn resolve_all_api_versions(cache: &ApiVersionCache, host: &str) -> ApiVersions {
ApiVersions {
produce: resolve_api_version(cache, host, api_key::PRODUCE, API_VERSION_PRODUCE),
fetch: resolve_api_version(cache, host, api_key::FETCH, API_VERSION_FETCH),
metadata: resolve_api_version(cache, host, api_key::METADATA, API_VERSION_METADATA),
list_offsets: resolve_api_version(cache, host, api_key::LIST_OFFSETS, API_VERSION_LIST_OFFSETS),
find_coordinator: resolve_api_version(cache, host, api_key::FIND_COORDINATOR, API_VERSION_FIND_COORDINATOR),
offset_commit: resolve_api_version(cache, host, api_key::OFFSET_COMMIT, API_VERSION_OFFSET_COMMIT),
offset_fetch: resolve_api_version(cache, host, api_key::OFFSET_FETCH, API_VERSION_OFFSET_FETCH),
}
}
#[derive(Debug, Copy, Clone)]
pub struct ApiVersions {
pub produce: i16,
pub fetch: i16,
pub metadata: i16,
pub list_offsets: i16,
pub find_coordinator: i16,
pub offset_commit: i16,
pub offset_fetch: i16,
}
impl Default for ApiVersions {
fn default() -> Self {
ApiVersions {
produce: API_VERSION_PRODUCE,
fetch: API_VERSION_FETCH,
metadata: API_VERSION_METADATA,
list_offsets: API_VERSION_LIST_OFFSETS,
find_coordinator: API_VERSION_FIND_COORDINATOR,
offset_commit: API_VERSION_OFFSET_COMMIT,
offset_fetch: API_VERSION_OFFSET_FETCH,
}
}
}