use bytes_parser::BytesParser;
use crate::errors::{KonsumerOffsetsError, KonsumerOffsetsError::UnsupportedOffsetCommitSchema};
use crate::utils::{parse_i16, parse_i32, parse_i64, parse_str};
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(any(feature = "ts_int", feature = "ts_chrono"), derive(Default))]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct OffsetCommit {
pub message_version: i16,
pub group: String,
pub topic: String,
pub partition: i32,
pub is_tombstone: bool,
pub schema_version: i16,
pub offset: i64,
pub leader_epoch: i32,
pub metadata: String,
#[cfg(feature = "ts_int")]
pub commit_timestamp: i64,
#[cfg(feature = "ts_chrono")]
pub commit_timestamp: chrono::DateTime<chrono::Utc>,
#[cfg(feature = "ts_time")]
pub commit_timestamp: time::OffsetDateTime,
#[cfg(feature = "ts_int")]
pub expire_timestamp: i64,
#[cfg(feature = "ts_chrono")]
pub expire_timestamp: chrono::DateTime<chrono::Utc>,
#[cfg(feature = "ts_time")]
pub expire_timestamp: time::OffsetDateTime,
}
#[cfg(feature = "ts_time")]
impl Default for OffsetCommit {
fn default() -> Self {
Self {
message_version: Default::default(),
group: Default::default(),
topic: Default::default(),
partition: Default::default(),
is_tombstone: Default::default(),
schema_version: Default::default(),
offset: Default::default(),
leader_epoch: Default::default(),
metadata: Default::default(),
commit_timestamp: time::OffsetDateTime::UNIX_EPOCH,
expire_timestamp: time::OffsetDateTime::UNIX_EPOCH,
}
}
}
impl OffsetCommit {
pub(crate) fn try_from(parser: &mut BytesParser, message_version: i16) -> Result<Self, KonsumerOffsetsError> {
Ok(OffsetCommit {
message_version,
group: parse_str(parser)?,
topic: parse_str(parser)?,
partition: parse_i32(parser)?,
is_tombstone: true,
..Default::default()
})
}
pub(crate) fn parse_payload(&mut self, parser: &mut BytesParser) -> Result<(), KonsumerOffsetsError> {
self.is_tombstone = false;
self.schema_version = parse_i16(parser)?;
if !(0..=3).contains(&self.schema_version) {
return Err(UnsupportedOffsetCommitSchema(self.schema_version));
}
self.offset = parse_i64(parser)?;
self.leader_epoch = if self.schema_version >= 3 {
parse_i32(parser)?
} else {
-1
};
self.metadata = parse_str(parser)?;
#[cfg(feature = "ts_int")]
{
self.commit_timestamp = parse_i64(parser)?;
}
#[cfg(feature = "ts_chrono")]
{
self.commit_timestamp = crate::utils::parse_chrono_datetime_utc(parser)?;
}
#[cfg(feature = "ts_time")]
{
self.commit_timestamp = crate::utils::parse_time_offset_datetime(parser)?
}
self.expire_timestamp = if self.schema_version == 1 {
#[cfg(feature = "ts_int")]
{
parse_i64(parser)?
}
#[cfg(feature = "ts_chrono")]
{
crate::utils::parse_chrono_datetime_utc(parser)?
}
#[cfg(feature = "ts_time")]
{
crate::utils::parse_time_offset_datetime(parser)?
}
} else {
#[cfg(feature = "ts_int")]
{
-1
}
#[cfg(feature = "ts_chrono")]
{
chrono::DateTime::<chrono::Utc>::default()
}
#[cfg(feature = "ts_time")]
{
time::OffsetDateTime::UNIX_EPOCH
}
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::utils::is_thread_safe;
use crate::OffsetCommit;
#[test]
fn test_types_thread_safety() {
is_thread_safe::<OffsetCommit>();
}
}