use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
#[repr(i8)]
pub enum FeatureUpgradeType {
Upgrade = 1,
SafeDowngrade = 2,
UnsafeDowngrade = 3,
}
impl FeatureUpgradeType {
#[cfg(test)]
fn from_i8(v: i8) -> Self {
match v {
2 => Self::SafeDowngrade,
3 => Self::UnsafeDowngrade,
_ => Self::Upgrade,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct FeatureUpdateKey {
pub feature: String,
pub max_version_level: i16,
pub upgrade_type: FeatureUpgradeType,
}
impl FeatureUpdateKey {
pub fn upgrade(feature: impl Into<String>, max_version_level: i16) -> Self {
Self {
feature: feature.into(),
max_version_level,
upgrade_type: FeatureUpgradeType::Upgrade,
}
}
pub fn safe_downgrade(feature: impl Into<String>, max_version_level: i16) -> Self {
Self {
feature: feature.into(),
max_version_level,
upgrade_type: FeatureUpgradeType::SafeDowngrade,
}
}
pub fn unsafe_downgrade(feature: impl Into<String>, max_version_level: i16) -> Self {
Self {
feature: feature.into(),
max_version_level,
upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
}
}
pub fn delete(feature: impl Into<String>) -> Self {
Self {
feature: feature.into(),
max_version_level: 0,
upgrade_type: FeatureUpgradeType::SafeDowngrade,
}
}
}
#[derive(Debug, Clone)]
pub struct UpdateFeaturesRequest {
pub timeout_ms: i32,
pub feature_updates: Vec<FeatureUpdateKey>,
pub validate_only: bool,
}
impl UpdateFeaturesRequest {
pub fn new(feature_updates: Vec<FeatureUpdateKey>) -> Self {
Self {
timeout_ms: 60_000,
feature_updates,
validate_only: false,
}
}
pub fn with_timeout_ms(mut self, timeout_ms: i32) -> Self {
self.timeout_ms = timeout_ms;
self
}
pub fn with_validate_only(mut self, validate_only: bool) -> Self {
self.validate_only = validate_only;
self
}
pub fn encode_v0(&self, buf: &mut impl BufMut) -> Result<()> {
self.timeout_ms.encode(buf);
encode_compact_array_len(self.feature_updates.len(), buf)?;
for update in &self.feature_updates {
KafkaString::new(&update.feature).try_encode_compact(buf)?;
update.max_version_level.encode(buf);
let allow_downgrade = update.upgrade_type != FeatureUpgradeType::Upgrade;
buf.put_u8(allow_downgrade as u8);
TaggedFields::default().try_encode(buf)?; }
TaggedFields::default().try_encode(buf)?; Ok(())
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
self.timeout_ms.encode(buf);
encode_compact_array_len(self.feature_updates.len(), buf)?;
for update in &self.feature_updates {
KafkaString::new(&update.feature).try_encode_compact(buf)?;
update.max_version_level.encode(buf);
buf.put_i8(update.upgrade_type as i8);
TaggedFields::default().try_encode(buf)?; }
buf.put_u8(self.validate_only as u8);
TaggedFields::default().try_encode(buf)?; Ok(())
}
}
impl VersionedEncode for UpdateFeaturesRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
0 => self.encode_v0(buf)?,
1 => self.encode_v1(buf)?,
_ => return unsupported_encode!("UpdateFeaturesRequest", version),
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct UpdatableFeatureResult {
pub feature: String,
pub error_code: ErrorCode,
pub error_message: Option<String>,
}
#[derive(Debug, Clone)]
pub struct UpdateFeaturesResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub results: Vec<UpdatableFeatureResult>,
}
impl UpdateFeaturesResponse {
pub fn is_ok(&self) -> bool {
self.error_code.is_ok()
}
pub fn decode_v0(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let raw_count = crate::util::varint::decode_unsigned_varint(buf)?;
let items = check_compact_array_len(raw_count)?;
let mut results = Vec::with_capacity(items);
for _ in 0..items {
let feature = non_nullable_string("feature", KafkaString::decode_compact(buf)?.0)?;
let feature_error = ErrorCode::from(i16::decode(buf)?);
let feature_msg = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?; results.push(UpdatableFeatureResult {
feature,
error_code: feature_error,
error_message: feature_msg,
});
}
let _ = TaggedFields::decode(buf)?; Ok(Self {
throttle_time_ms,
error_code,
error_message,
results,
})
}
}
impl VersionedDecode for UpdateFeaturesResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
0 | 1 => Self::decode_v0(buf),
_ => unsupported_decode!("UpdateFeaturesResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::util::varint;
use bytes::BytesMut;
fn put_tagged_fields(buf: &mut BytesMut) {
buf.put_u8(0);
}
#[test]
fn test_update_features_request_v0_encode() {
let request = UpdateFeaturesRequest::new(vec![
FeatureUpdateKey::upgrade("metadata.version", 17),
FeatureUpdateKey::delete("group.version"),
])
.with_timeout_ms(30_000);
let mut buf = BytesMut::new();
request.encode_v0(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(i32::decode(&mut cur).unwrap(), 30_000);
assert_eq!(varint::decode_unsigned_varint(&mut cur).unwrap(), 3);
let name0 = KafkaString::decode_compact(&mut cur).unwrap().0.unwrap();
assert_eq!(name0, "metadata.version");
assert_eq!(i16::decode(&mut cur).unwrap(), 17);
assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_u8(), 0); let name1 = KafkaString::decode_compact(&mut cur).unwrap().0.unwrap();
assert_eq!(name1, "group.version");
assert_eq!(i16::decode(&mut cur).unwrap(), 0);
assert_eq!(cur.get_u8(), 1); assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_u8(), 0);
assert!(cur.is_empty());
}
#[test]
fn test_update_features_request_v1_encode() {
let request = UpdateFeaturesRequest::new(vec![FeatureUpdateKey::safe_downgrade(
"metadata.version",
15,
)])
.with_validate_only(true);
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
let mut cur = &buf[..];
assert_eq!(i32::decode(&mut cur).unwrap(), 60_000);
assert_eq!(varint::decode_unsigned_varint(&mut cur).unwrap(), 2);
let name = KafkaString::decode_compact(&mut cur).unwrap().0.unwrap();
assert_eq!(name, "metadata.version");
assert_eq!(i16::decode(&mut cur).unwrap(), 15);
assert_eq!(cur.get_i8(), 2); assert_eq!(cur.get_u8(), 0); assert_eq!(cur.get_u8(), 1);
assert_eq!(cur.get_u8(), 0);
assert!(cur.is_empty());
}
#[test]
fn test_update_features_request_versioned_dispatch() {
let request = UpdateFeaturesRequest::new(vec![FeatureUpdateKey::upgrade("test", 1)]);
let mut buf0 = BytesMut::new();
let mut buf1 = BytesMut::new();
request.encode_versioned(0, &mut buf0).unwrap();
request.encode_versioned(1, &mut buf1).unwrap();
assert_ne!(buf0, buf1);
let mut buf2 = BytesMut::new();
assert!(request.encode_versioned(2, &mut buf2).is_err());
}
#[test]
fn test_update_features_response_decode_v0() {
let mut buf = BytesMut::new();
buf.put_i32(100); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(2, &mut buf); let name = "metadata.version";
varint::encode_unsigned_varint((name.len() + 1) as u32, &mut buf);
buf.put_slice(name.as_bytes());
buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); put_tagged_fields(&mut buf); put_tagged_fields(&mut buf);
let resp = UpdateFeaturesResponse::decode_v0(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert!(resp.is_ok());
assert!(resp.error_message.is_none());
assert_eq!(resp.results.len(), 1);
assert_eq!(resp.results[0].feature, "metadata.version");
assert!(resp.results[0].error_code.is_ok());
assert!(resp.results[0].error_message.is_none());
}
#[test]
fn test_update_features_response_decode_v0_with_error() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(2, &mut buf); let name = "bad.feature";
varint::encode_unsigned_varint((name.len() + 1) as u32, &mut buf);
buf.put_slice(name.as_bytes());
buf.put_i16(1); let msg = "not supported";
varint::encode_unsigned_varint((msg.len() + 1) as u32, &mut buf);
buf.put_slice(msg.as_bytes());
put_tagged_fields(&mut buf);
put_tagged_fields(&mut buf);
let resp = UpdateFeaturesResponse::decode_v0(&mut buf.freeze()).unwrap();
assert!(resp.is_ok()); assert_eq!(resp.results[0].feature, "bad.feature");
assert!(!resp.results[0].error_code.is_ok());
assert_eq!(
resp.results[0].error_message.as_deref(),
Some("not supported")
);
}
#[test]
fn test_feature_update_key_constructors() {
let upgrade = FeatureUpdateKey::upgrade("f1", 5);
assert_eq!(upgrade.upgrade_type, FeatureUpgradeType::Upgrade);
assert_eq!(upgrade.max_version_level, 5);
let safe = FeatureUpdateKey::safe_downgrade("f2", 3);
assert_eq!(safe.upgrade_type, FeatureUpgradeType::SafeDowngrade);
let unsf = FeatureUpdateKey::unsafe_downgrade("f3", 1);
assert_eq!(unsf.upgrade_type, FeatureUpgradeType::UnsafeDowngrade);
let del = FeatureUpdateKey::delete("f4");
assert_eq!(del.max_version_level, 0);
assert_eq!(del.upgrade_type, FeatureUpgradeType::SafeDowngrade);
}
#[test]
fn test_feature_upgrade_type_from_i8() {
assert_eq!(FeatureUpgradeType::from_i8(1), FeatureUpgradeType::Upgrade);
assert_eq!(
FeatureUpgradeType::from_i8(2),
FeatureUpgradeType::SafeDowngrade
);
assert_eq!(
FeatureUpgradeType::from_i8(3),
FeatureUpgradeType::UnsafeDowngrade
);
assert_eq!(FeatureUpgradeType::from_i8(99), FeatureUpgradeType::Upgrade);
}
}