1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
//! # Konsumer_Offsets: parse Kafka `__consumer_offsets`
//!
//! A library crate to parse the content of the [Kafka] [`__consumer_offsets`] internal topic.
//!
//! This was written by reverse-engineering the parsing logic used in [Kafka], looking at both
//! [Group Coordinator] Broker and [Consumer Client] source code.
//!
//! An **extra perk** of this crate, is that it serves also a didactic purpose: it attempts
//! to documenting really clearly what each struct and field represents, what it means in
//! the context of Kafka _"coordinated consumption of messages"_, and how it was parsed.
//!
//! ## Features
//!
//! Features:
//!
//! * `ts_int`: use `i64` to represent Unix Timestamps (conflicts with `ts_chrono` and `ts_time`)
//! * `ts_chrono`: use [`chrono::DateTime<Utc>`] to represent Unix Timestamps (conflicts with `ts_int` and `ts_time`)
//! * `ts_time`: use [`time::OffsetDateTime`] to represent Unix Timestamps (conflicts with `ts_chrono` and `ts_int`)
//! * `serde`: support serialization/deserialization for all types exposed by this crate via the [serde] crate
//!
//! Default features: `ts_int`.
//!
//! ## Before you dive in
//!
//! What this crate does, and what data it can parse, will make
//! sense only if you have read about how Kafka tracks Consumer Offsets. Especially, what
//! part the Consumer [Group Coordinator] play in that. A _great_ resource to learn (or refresh)
//! how the [Consumer Group Protocol] works, can be found on the [Confluent Developer] website.
//!
//! Also, as it will be referenced when documenting the data itself, it is important to know
//! that [`__consumer_offsets`] uses [Log Compaction]: this means that it follows all the compaction
//! rules, including the handling of _tombstone_ messages, used to "mark for deletion" all messages
//! of the given key.
//!
//! ## Example
//!
//! Here is how the code should be structured to handle the flow of messages coming out of
//! the [`__consumer_offsets`] topic.
//!
//! ```
//! use konsumer_offsets::{KonsumerOffsetsData, KonsumerOffsetsError};
//!
//! fn consume_consumer_offsets_message(k_bytes: Option<Vec<u8>>, p_bytes: Option<Vec<u8>>) {//!
//! match KonsumerOffsetsData::try_from_bytes_vec(k_bytes, p_bytes) {
//! Ok(kod) => {
//! match kod {
//! KonsumerOffsetsData::OffsetCommit(offset_commit) => {
//! /* ... a consumer committed an offset for a partition in a topic ... */
//! }
//! KonsumerOffsetsData::GroupMetadata(group_metadata) => {
//! /* ... a consumer joined or leaved the group ... */
//! }
//! }
//! }
//! Err(e) => {
//! /* ... handle parsing error ... */
//! }
//! }
//! }
//! ```
//!
//! ## 1 topic, 2 different data types
//!
//! The messages (a.k.a. records) in [`__consumer_offsets`] are encoded using a _bespoke_ binary
//! protocol, and each message can contain 1 of 2 possible data types (at least as per Kafka 3):
//!
//! * [`OffsetCommit`]
//! * [`GroupMetadata`]
//!
//! Which data type is contained in a message is determined by parsing the first 2-bytes integer,
//! carrying a `message_version` code: this will be amongst the fields of both structs.
//!
//!
//! ### [`OffsetCommit`] a.k.a. "where is the consumer at?"
//!
//! Kafka uses code generation to materialise [`OffsetCommit`] into Java code,
//! and this is composed of 2 json definitions, that at compile time get turned into Java Classes:
//! [`OffsetCommitKey`] and [`OffsetCommitValue`].
//!
//! Assuming you are familiar with the [Consumer Group Protocol] (as discussed above): this
//! is what the Group Coordinator stores into the [`__consumer_offsets`] topic, when a consumer,
//! assignor of a specific _partition_, sends a request to the coordinator to _commit_ the _offset_
//! up to which it has consumed. It's _essentially_ informing the group coordinator that:
//!
//! > "I consumed this partition for this topic up to this offset".
//!
//! The information stored in [`OffsetCommit`] can be used to track
//! _who has consumed what until now_. Refer to its own documentation for further details.
//!
//! ### [`GroupMetadata`] a.k.a. "what are the members of this group?"
//!
//! Similarly to [`OffsetCommit`], Kafka uses code generation to materialise [`GroupMetadata`]
//! into Java code, and this is composed of 2 json definitions, that at compile time
//! get turned into Java Classes: [`GroupMetadataKey`] and [`GroupMetadataValue`].
//!
//! [`GroupMetadata`] contains the current state of a consumer group, and it used by the [Group
//! Coordinator] to track "which consumer is subscribed to what topic" and "which consumer
//! is assigned of which partition".
//!
//! Compared to [`OffsetCommit`], [`GroupMetadata`] appears _relatively infrequently_ in
//! [`__consumer_offsets`]: this is because it's usually produced when consumers join or leave
//! groups.
//!
//! ### [`KonsumerOffsetsData`] i.e. "making it rusty"
//!
//! The entry point to this crate is [`KonsumerOffsetsData`]: this is an enum where each variant
//! wraps one of the 2 possible data types seen above, as they are parsed from bytes coming
//! out of [`__consumer_offsets`].
//!
//! For convenience we provide 2 `try_from_*` factory methods:
//!
//! * [`KonsumerOffsetsData::try_from_bytes`]
//! * [`KonsumerOffsetsData::try_from_bytes_vec`]
//!
//! This is because some Kafka clients might return the `(key,payload)` _tuple_ as `[u8]`,
//! others as `Vec<u8>`.
//!
//! ## A few words about parsing Kafka _entrails_
//!
//! Kafka runs on the JVM, so it's limited to what the JVM supports.
//!
//! This crate deals with these limitations, and the data it parses and the fields we use to
//! store it, are a reflection of what Java supports. For example, in places where one would expect
//! an unsigned integer, like a string length or an ever increasing identifier, we use
//! the signed integers that Java supports.
//!
//! ### Java Primitive types in Rust
//!
//! Java uses a more limited set of [Primitive Types] for scalar values, compared to Rust.
//! Fortunately, Rust offering is large enough that is possible to map all Java types
//! to a Rust type:
//!
//! | Java | Rust |
//! |:---------:|:------------------:|
//! | `short` | `i16` |
//! | `int` | `i32` |
//! | `long` | `i64` |
//! | `float` | `f32` |
//! | `double` | `f64` |
//! | `boolean` | `u8` |
//! | `char` | `char(u16 as u32)` |
//!
//! And, additional limitation, Java only supported Big Endian encoding.
//!
//! ### Bespoke formats used by `__consumer_offsets`
//!
//! In addition to fields of one of the primitive types above,
//! messages of this internal topic also contain a few more structured types:
//! _string_ and _vector of bytes_.
//!
//! #### Type: _string_
//!
//! 2 bytes are used to store the string length as Java `short`,
//! followed by a UTF-8 encoded string of that length:
//!
//! ```text
//! 0 8 16 ... 8 x LENGTH
//! +-8-bits-+-8-bits-+-----------------------+
//! | 1 byte | 1 byte | ...LENGTH x bytes... |
//! +-----------------------------------------+
//! | i16: LENGTH | UTF8_STRING |
//! +-----------------------------------------+
//! ```
//!
//! #### Type: _vector of bytes_
//!
//! 4 bytes are used to store the vector length as Java `int`,
//! followed by a sequence of bytes of that length:
//!
//! ```text
//! 0 8 16 24 32 ... 8 x LENGTH
//! +-8-bits-+-8-bits-+-8-bits-+-8-bits-+-----------------------+
//! | 1 byte | 1 byte | 1 byte | 1 byte | ...LENGTH x bytes... |
//! +-----------------------------------------------------------+
//! | i32: LENGTH | BYTES_VECTOR |
//! +-----------------------------------------------------------+
//! ```
//!
//! ## Disclaimer
//!
//! [Confluent] is a great place to start if you are looking for a commercial Kafka solution,
//! including PaaS, SaaS, training and more. **BUT** it is completely unrelated from this
//! crate and any reference to it is purely educational.
//!
//! [Primitive Types]: https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html
//! [`__consumer_offsets`]: https://kafka.apache.org/documentation/#impl_offsettracking
//! [Kafka]: https://kafka.apache.org/
//! [Consumer Group Protocol]: https://developer.confluent.io/learn-kafka/architecture/consumer-group-protocol/
//! [Confluent Developer]: https://developer.confluent.io/
//! [Confluent]: https://www.confluent.io/about/
//! [`OffsetCommitKey`]: https://github.com/apache/kafka/blob/trunk/core/src/main/resources/common/message/OffsetCommitKey.json
//! [`OffsetCommitValue`]: https://github.com/apache/kafka/blob/trunk/core/src/main/resources/common/message/OffsetCommitValue.json
//! [`GroupMetadataKey`]: https://github.com/apache/kafka/blob/trunk/core/src/main/resources/common/message/GroupMetadataKey.json
//! [`GroupMetadataValue`]: https://github.com/apache/kafka/blob/trunk/core/src/main/resources/common/message/GroupMetadataValue.json
//! [Log Compaction]: https://kafka.apache.org/documentation/#compaction
//! [Group Coordinator]: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
//! [Consumer Client]: https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/clients/consumer
//! [`chrono::DateTime<Utc>`]: https://docs.rs/chrono/latest/chrono/struct.DateTime.html#method.from_utc
//! [`time::OffsetDateTime`]: https://time-rs.github.io/api/time/struct.OffsetDateTime.html#method.from_unix_timestamp_nanos
//! [serde]: https://crates.io/crates/serde
//!
pub use *;
pub use *;
pub use *;
pub use *;
compile_error!;