kafka_offsets_parser/
lib.rs

1use std::borrow::Cow;
2use std::convert::TryFrom;
3
4use thiserror::Error;
5
6use nom::number::complete::{be_u16, be_i32, be_u64, be_u32};
7use nom::multi::length_data;
8use nom::sequence::tuple;
9use nom::error::{ErrorKind, ParseError};
10
11type IResult<I, O> = nom::IResult<I, O, ConsumerOffsetsMessageParseError<I>>;
12
13/// Error type for our parsers
14#[derive(Error, Debug)]
15pub enum ConsumerOffsetsMessageParseError<I: std::fmt::Debug> {
16    #[error("invalid utf8 sequence")]
17    FromUtf8Error(#[from] std::str::Utf8Error),
18    #[error("parsing error")]
19    Nom(I, ErrorKind),
20    /// Ran out of data
21    #[error("insufficient data")]
22    Incomplete,
23}
24
25impl<I: std::fmt::Debug> ParseError<I> for ConsumerOffsetsMessageParseError<I> {
26  fn from_error_kind(input: I, kind: ErrorKind) -> Self {
27    ConsumerOffsetsMessageParseError::Nom(input, kind)
28  }
29
30  fn append(_: I, _: ErrorKind, other: Self) -> Self {
31    other
32  }
33}
34
35/// Message value when key is an OffsetKey
36#[derive(Debug, Clone)]
37pub struct OffsetCommitValue<'a> {
38    pub version: u16,
39    pub offset: u64,
40    pub leader_epoch: Option<u32>,
41    metadata: Cow<'a, str>,
42    pub commit_timestamp: u64,
43    pub expire_timestamp: Option<u64>,
44}
45
46impl<'a> OffsetCommitValue<'a> {
47    pub fn metadata(&self) -> &str {
48        &self.metadata
49    }
50}
51
52impl<'a> TryFrom<&'a [u8]> for OffsetCommitValue<'a> {
53    type Error = ConsumerOffsetsMessageParseError<&'a [u8]>;
54
55    fn try_from(bytes: &'a [u8]) -> Result<Self, Self::Error> {
56        match parse_offset_commit_value(bytes) {
57            Ok((_, res)) => Ok(res),
58            Err(nom::Err::Error(err)) |
59            Err(nom::Err::Failure(err)) => Err(err),
60            Err(nom::Err::Incomplete(_)) => Err(ConsumerOffsetsMessageParseError::Incomplete),
61        }
62    }
63}
64
65/// The message key for messages on the __consumer_offsets topic
66#[derive(Debug, Clone)]
67pub enum ConsumerOffsetsMessageKey<'a> {
68    /// Key for OffsetCommitValue message
69    Offset(OffsetKey<'a>),
70
71    /// Key for a GroupMetadata message.
72    ///
73    /// We don't actually parse this key, but since it's a possible variant we
74    /// include a tag here so it can be handled properly vs inspecting errors to
75    /// check if parsing failed because we haven't implemented this key type.
76    GroupMetadata,
77}
78
79/// Data for the OffsetKey variant
80#[derive(Debug, Clone)]
81pub struct OffsetKey<'a> {
82    group: Cow<'a, str>,
83    topic: Cow<'a, str>,
84    pub partition: i32,
85}
86
87impl<'a> OffsetKey<'a> {
88    pub fn group(&self) -> &str {
89        &self.group
90    }
91
92    pub fn topic(&self) -> &str {
93        &self.topic
94    }
95}
96
97impl<'a> TryFrom<&'a [u8]> for ConsumerOffsetsMessageKey<'a> {
98    type Error = ConsumerOffsetsMessageParseError<&'a [u8]>;
99
100    fn try_from(bytes: &'a [u8]) -> Result<Self, Self::Error> {
101        match parse_consumer_offsets_message_key(bytes) {
102            Ok((_, res)) => Ok(res),
103            Err(nom::Err::Error(err)) |
104            Err(nom::Err::Failure(err)) => Err(err),
105            Err(nom::Err::Incomplete(_)) => Err(ConsumerOffsetsMessageParseError::Incomplete),
106        }
107    }
108}
109
110// -------------------------
111// nom parsing functions
112// -------------------------
113
114/// takes first two bytes as big endian u16 length; then uses that to parse
115/// utf-8 string.
116fn length_str(bytes: &[u8]) -> IResult<&[u8], &str> {
117    let (bytes, sbuf) = length_data(be_u16)(bytes)?;
118    match std::str::from_utf8(sbuf) {
119        Ok(s) => Ok((bytes, s)),
120        Err(e) => Err(nom::Err::Error(From::from(e)))
121    }
122}
123
124fn parse_offset_commit_value0(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
125    let (bytes, (offset, metadata, commit_timestamp)) = tuple((be_u64, length_str, be_u64))(bytes)?;
126    Ok((bytes, OffsetCommitValue {
127        version: 0,
128        offset,
129        metadata: Cow::Borrowed(metadata),
130        commit_timestamp,
131        expire_timestamp: None,
132        leader_epoch: None,
133    }))
134}
135
136fn parse_offset_commit_value1(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
137    let (bytes, (offset, metadata, commit_timestamp, expire_timestamp)) = tuple((be_u64, length_str, be_u64, be_u64))(bytes)?;
138    Ok((bytes, OffsetCommitValue {
139        version: 1,
140        offset,
141        metadata: Cow::Borrowed(metadata),
142        commit_timestamp,
143        expire_timestamp: Some(expire_timestamp),
144        leader_epoch: None,
145    }))
146}
147
148fn parse_offset_commit_value3(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
149    let (bytes, (offset, leader_epoch, metadata, commit_timestamp)) = tuple((be_u64, be_u32, length_str, be_u64))(bytes)?;
150    Ok((bytes, OffsetCommitValue {
151        version: 3,
152        offset,
153        metadata: Cow::Borrowed(metadata),
154        commit_timestamp,
155        leader_epoch: if leader_epoch == u32::MAX { None } else { Some(leader_epoch) },
156        expire_timestamp: None,
157    }))
158}
159
160fn parse_offset_commit_value(bytes: &[u8]) -> IResult<&[u8], OffsetCommitValue> {
161    let (bytes, version) = be_u16(bytes)?;
162    match version {
163        0     => parse_offset_commit_value0(bytes),
164        1..=2 => parse_offset_commit_value1(bytes),
165        3     => parse_offset_commit_value3(bytes),
166        _ => Err(nom::Err::Error(ConsumerOffsetsMessageParseError::Nom(bytes, ErrorKind::Fail)))
167    }
168}
169
170fn parse_offset_key(bytes: &[u8]) -> IResult<&[u8], ConsumerOffsetsMessageKey> {
171    let (bytes, (group, topic, partition)) = tuple((length_str, length_str, be_i32))(bytes)?;
172
173    let offset_key = OffsetKey {
174        group: Cow::Borrowed(group),
175        topic: Cow::Borrowed(topic),
176        partition
177    };
178    Ok((bytes, ConsumerOffsetsMessageKey::Offset(offset_key)))
179}
180
181fn parse_consumer_offsets_message_key(bytes: &[u8]) -> IResult<&[u8], ConsumerOffsetsMessageKey> {
182    let (bytes, version) = be_u16(bytes)?;
183    match version {
184        0..=1 => parse_offset_key(bytes),
185        _ => Ok((&[], ConsumerOffsetsMessageKey::GroupMetadata))
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    fn assert_clone<T: Clone + ToOwned>() {}
193
194    #[test]
195    fn is_cloneable() {
196        assert_clone::<OffsetCommitValue>();
197        assert_clone::<ConsumerOffsetsMessageKey>();
198    }
199}