1use std::borrow::Cow;
2use std::convert::TryFrom;
3
4use thiserror::Error;
5
6use nom::number::complete::{be_u16, be_i32, be_u64, be_u32};
7use nom::multi::length_data;
8use nom::sequence::tuple;
9use nom::error::{ErrorKind, ParseError};
10
11type IResult<I, O> = nom::IResult<I, O, ConsumerOffsetsMessageParseError<I>>;
12
13#[derive(Error, Debug)]
15pub enum ConsumerOffsetsMessageParseError<I: std::fmt::Debug> {
16 #[error("invalid utf8 sequence")]
17 FromUtf8Error(#[from] std::str::Utf8Error),
18 #[error("parsing error")]
19 Nom(I, ErrorKind),
20 #[error("insufficient data")]
22 Incomplete,
23}
24
25impl<I: std::fmt::Debug> ParseError<I> for ConsumerOffsetsMessageParseError<I> {
26 fn from_error_kind(input: I, kind: ErrorKind) -> Self {
27 ConsumerOffsetsMessageParseError::Nom(input, kind)
28 }
29
30 fn append(_: I, _: ErrorKind, other: Self) -> Self {
31 other
32 }
33}
34
35#[derive(Debug, Clone)]
37pub struct OffsetCommitValue<'a> {
38 pub version: u16,
39 pub offset: u64,
40 pub leader_epoch: Option<u32>,
41 metadata: Cow<'a, str>,
42 pub commit_timestamp: u64,
43 pub expire_timestamp: Option<u64>,
44}
45
46impl<'a> OffsetCommitValue<'a> {
47 pub fn metadata(&self) -> &str {
48 &self.metadata
49 }
50}
51
52impl<'a> TryFrom<&'a [u8]> for OffsetCommitValue<'a> {
53 type Error = ConsumerOffsetsMessageParseError<&'a [u8]>;
54
55 fn try_from(bytes: &'a [u8]) -> Result<Self, Self::Error> {
56 match parse_offset_commit_value(bytes) {
57 Ok((_, res)) => Ok(res),
58 Err(nom::Err::Error(err)) |
59 Err(nom::Err::Failure(err)) => Err(err),
60 Err(nom::Err::Incomplete(_)) => Err(ConsumerOffsetsMessageParseError::Incomplete),
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub enum ConsumerOffsetsMessageKey<'a> {
68 Offset(OffsetKey<'a>),
70
71 GroupMetadata,
77}
78
79#[derive(Debug, Clone)]
81pub struct OffsetKey<'a> {
82 group: Cow<'a, str>,
83 topic: Cow<'a, str>,
84 pub partition: i32,
85}
86
87impl<'a> OffsetKey<'a> {
88 pub fn group(&self) -> &str {
89 &self.group
90 }
91
92 pub fn topic(&self) -> &str {
93 &self.topic
94 }
95}
96
97impl<'a> TryFrom<&'a [u8]> for ConsumerOffsetsMessageKey<'a> {
98 type Error = ConsumerOffsetsMessageParseError<&'a [u8]>;
99
100 fn try_from(bytes: &'a [u8]) -> Result<Self, Self::Error> {
101 match parse_consumer_offsets_message_key(bytes) {
102 Ok((_, res)) => Ok(res),
103 Err(nom::Err::Error(err)) |
104 Err(nom::Err::Failure(err)) => Err(err),
105 Err(nom::Err::Incomplete(_)) => Err(ConsumerOffsetsMessageParseError::Incomplete),
106 }
107 }
108}
109
110fn length_str(bytes: &[u8]) -> IResult<&[u8], &str> {
117 let (bytes, sbuf) = length_data(be_u16)(bytes)?;
118 match std::str::from_utf8(sbuf) {
119 Ok(s) => Ok((bytes, s)),
120 Err(e) => Err(nom::Err::Error(From::from(e)))
121 }
122}
123
124fn parse_offset_commit_value0(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
125 let (bytes, (offset, metadata, commit_timestamp)) = tuple((be_u64, length_str, be_u64))(bytes)?;
126 Ok((bytes, OffsetCommitValue {
127 version: 0,
128 offset,
129 metadata: Cow::Borrowed(metadata),
130 commit_timestamp,
131 expire_timestamp: None,
132 leader_epoch: None,
133 }))
134}
135
136fn parse_offset_commit_value1(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
137 let (bytes, (offset, metadata, commit_timestamp, expire_timestamp)) = tuple((be_u64, length_str, be_u64, be_u64))(bytes)?;
138 Ok((bytes, OffsetCommitValue {
139 version: 1,
140 offset,
141 metadata: Cow::Borrowed(metadata),
142 commit_timestamp,
143 expire_timestamp: Some(expire_timestamp),
144 leader_epoch: None,
145 }))
146}
147
148fn parse_offset_commit_value3(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
149 let (bytes, (offset, leader_epoch, metadata, commit_timestamp)) = tuple((be_u64, be_u32, length_str, be_u64))(bytes)?;
150 Ok((bytes, OffsetCommitValue {
151 version: 3,
152 offset,
153 metadata: Cow::Borrowed(metadata),
154 commit_timestamp,
155 leader_epoch: if leader_epoch == u32::MAX { None } else { Some(leader_epoch) },
156 expire_timestamp: None,
157 }))
158}
159
160fn parse_offset_commit_value(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
161 let (bytes, version) = be_u16(bytes)?;
162 match version {
163 0 => parse_offset_commit_value0(bytes),
164 1..=2 => parse_offset_commit_value1(bytes),
165 3 => parse_offset_commit_value3(bytes),
166 _ => Err(nom::Err::Error(ConsumerOffsetsMessageParseError::Nom(bytes, ErrorKind::Fail)))
167 }
168}
169
170fn parse_offset_key(bytes: &[u8]) -> IResult<&[u8], ConsumerOffsetsMessageKey> {
171 let (bytes, (group, topic, partition)) = tuple((length_str, length_str, be_i32))(bytes)?;
172
173 let offset_key = OffsetKey {
174 group: Cow::Borrowed(group),
175 topic: Cow::Borrowed(topic),
176 partition
177 };
178 Ok((bytes, ConsumerOffsetsMessageKey::Offset(offset_key)))
179}
180
181fn parse_consumer_offsets_message_key(bytes: &[u8]) -> IResult<&[u8], ConsumerOffsetsMessageKey> {
182 let (bytes, version) = be_u16(bytes)?;
183 match version {
184 0..=1 => parse_offset_key(bytes),
185 _ => Ok((&[], ConsumerOffsetsMessageKey::GroupMetadata))
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 fn assert_clone<T: Clone + ToOwned>() {}
193
194 #[test]
195 fn is_cloneable() {
196 assert_clone::<OffsetCommitValue>();
197 assert_clone::<ConsumerOffsetsMessageKey>();
198 }
199}