use bytes_parser::BytesParser;
use crate::errors::KonsumerOffsetsError;
use crate::group_metadata::GroupMetadata;
use crate::offset_commit::OffsetCommit;
const MSG_V0_OFFSET_COMMIT: i16 = 0;
const MSG_V1_OFFSET_COMMIT: i16 = 1;
const MSG_V2_GROUP_METADATA: i16 = 2;
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum KonsumerOffsetsData {
OffsetCommit(OffsetCommit),
GroupMetadata(GroupMetadata),
}
impl KonsumerOffsetsData {
pub fn try_from_bytes(
key: Option<&[u8]>,
payload: Option<&[u8]>,
) -> Result<KonsumerOffsetsData, KonsumerOffsetsError> {
match key {
None => Err(KonsumerOffsetsError::MessageKeyMissing),
Some(key_bytes) => {
let mut key_parser = BytesParser::from(key_bytes);
match key_parser.parse_i16() {
Ok(message_version) => match message_version {
MSG_V0_OFFSET_COMMIT..=MSG_V1_OFFSET_COMMIT => {
let mut offset_commit = OffsetCommit::try_from(&mut key_parser, message_version)?;
if let Some(payload_bytes) = payload {
let mut payload_parser = BytesParser::from(payload_bytes);
offset_commit.parse_payload(&mut payload_parser)?;
}
Ok(KonsumerOffsetsData::OffsetCommit(offset_commit))
},
MSG_V2_GROUP_METADATA => {
let mut group_metadata = GroupMetadata::try_from(&mut key_parser, message_version)?;
if let Some(payload_bytes) = payload {
let mut payload_parser = BytesParser::from(payload_bytes);
group_metadata.parse_payload(&mut payload_parser)?;
}
Ok(KonsumerOffsetsData::GroupMetadata(group_metadata))
},
_ => Err(KonsumerOffsetsError::UnsupportedMessageVersion(message_version)),
},
Err(e) => Err(KonsumerOffsetsError::ByteParsingError(e)),
}
},
}
}
pub fn try_from_bytes_vec(
key: Option<Vec<u8>>,
payload: Option<Vec<u8>>,
) -> Result<KonsumerOffsetsData, KonsumerOffsetsError> {
Self::try_from_bytes(key.as_deref(), payload.as_deref())
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::path::Path;
use rstest::rstest;
use super::*;
use crate::utils::is_thread_safe;
#[rstest]
#[case(01)]
#[case(02)]
#[case(03)]
#[case(04)]
#[case(05)]
fn from_offset_commit(#[case] fixture_id: u16) {
#[cfg(feature = "ts_int")]
let (key_bytes, payload_bytes, fmt_string) = read_offset_commit_fixture(fixture_id, "ts_int");
#[cfg(feature = "ts_chrono")]
let (key_bytes, payload_bytes, fmt_string) = read_offset_commit_fixture(fixture_id, "ts_chrono");
#[cfg(feature = "ts_time")]
let (key_bytes, payload_bytes, fmt_string) = read_offset_commit_fixture(fixture_id, "ts_time");
let konsumer_offsets_data =
KonsumerOffsetsData::try_from_bytes(Some(key_bytes.as_slice()), Some(payload_bytes.as_slice()));
assert!(konsumer_offsets_data.is_ok());
match konsumer_offsets_data.unwrap() {
KonsumerOffsetsData::OffsetCommit(oc) => {
assert_eq!(format!("{:#?}", oc), fmt_string);
},
_ => panic!("Returned wrong enum value!"),
}
}
#[rstest]
#[case(01)]
#[case(02)]
#[case(03)]
#[case(04)]
#[case(05)]
fn from_group_metadata(#[case] fixture_id: u16) {
#[cfg(feature = "ts_int")]
let (key_bytes, payload_bytes, fmt_string) = read_group_metadata_fixture(fixture_id, "ts_int");
#[cfg(feature = "ts_chrono")]
let (key_bytes, payload_bytes, fmt_string) = read_group_metadata_fixture(fixture_id, "ts_chrono");
#[cfg(feature = "ts_time")]
let (key_bytes, payload_bytes, fmt_string) = read_group_metadata_fixture(fixture_id, "ts_time");
let konsumer_offsets_data =
KonsumerOffsetsData::try_from_bytes(Some(key_bytes.as_slice()), Some(payload_bytes.as_slice()));
assert!(konsumer_offsets_data.is_ok());
match konsumer_offsets_data.unwrap() {
KonsumerOffsetsData::GroupMetadata(gm) => {
assert_eq!(format!("{:#?}", gm), fmt_string);
},
_ => panic!("Returned wrong enum value!"),
}
}
fn read_offset_commit_fixture(fixture_id: u16, ts_feature: &str) -> (Vec<u8>, Vec<u8>, String) {
read_fixture("offset_commit", fixture_id, ts_feature)
}
fn read_group_metadata_fixture(fixture_id: u16, ts_feature: &str) -> (Vec<u8>, Vec<u8>, String) {
read_fixture("group_metadata", fixture_id, ts_feature)
}
fn read_fixture(fixture_name: &str, fixture_id: u16, ts_feature: &str) -> (Vec<u8>, Vec<u8>, String) {
let k = format!("fixtures/tests/{fixture_name}/{fixture_id:02}.key");
let key_path = Path::new(k.as_str());
let p = format!("fixtures/tests/{fixture_name}/{fixture_id:02}.payload");
let payload_path = Path::new(p.as_str());
let f = format!("fixtures/tests/{fixture_name}/{fixture_id:02}.{ts_feature}.fmt");
let fmt_path = Path::new(f.as_str());
assert!(key_path.exists());
assert!(payload_path.exists());
assert!(fmt_path.exists());
(fs::read(key_path).unwrap(), fs::read(payload_path).unwrap(), fs::read_to_string(fmt_path).unwrap())
}
#[test]
fn test_types_thread_safety() {
is_thread_safe::<KonsumerOffsetsData>();
}
}