kafka_protocol/lib.rs
1//! Implementation of [the Kafka wire protocol](https://kafka.apache.org/protocol.html) in Rust.
2//!
3//! This library follows the approach of the Kafka project and generates the protocol through
4//! [schema defined in JSON](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message).
5//! This ensures not only compatibility, but easy synchronization when new features are added
6//! upstream.
7//!
8//! The goal of this project is to provide 100% coverage of the Kafka API, as well as basic
9//! utilities for working with the protocol, including compression and record serialization.
10//!
11//! # Messages
12//! The [`messages`] module contains the generated request and response structs. These structs allow
13//! easy serialization and deserialization via [`bytes::Bytes`].
14//!
15//! ```rust
16//! use bytes::{Bytes, BytesMut};
17//! use kafka_protocol::messages::RequestHeader;
18//! use kafka_protocol::protocol::{StrBytes, Encodable, Decodable};
19//!
20//! let mut request_header = RequestHeader::default();
21//! request_header.correlation_id = 1;
22//! request_header.client_id = Some(StrBytes::from_static_str("test-client"));
23//! let mut buf = BytesMut::new();
24//! request_header.encode(&mut buf, 2);
25//! assert_eq!(request_header, RequestHeader::decode(&mut buf, 2).unwrap());
26//! ```
27//! Note that every message implementation of [`Encodable::encode`](crate::protocol::Encodable::encode)
28//! and [`Decodable::decode`](crate::protocol::Decodable::decode) requires a version to be provided
29//! explicitly. This is because every message contains *all* the fields that are valid for every
30//! version. These fields are *not* marked [`Option`], which would represent nullability according
31//! to a message's schema, but rather have a default value that represents a nil value. It is the
32//! user's responsibility to ensure that only valid fields of the decoded message version are used.
33//!
34//! ## Sending a Request
35//!
36//! A request can be created by serializing a [`messages::RequestHeader`] and any given request
37//! type to a [`bytes::Bytes`] buffer.
38//!
39//! ```rust
40//! use kafka_protocol::protocol::{StrBytes, Encodable, HeaderVersion};
41//! use bytes::{BytesMut, Bytes};
42//! use kafka_protocol::messages::{RequestHeader, ApiKey, ApiVersionsRequest};
43//! # use std::error::Error;
44//!
45//! let mut buf = BytesMut::new();
46//! let mut req_header = RequestHeader::default();
47//! req_header.request_api_version = 3;
48//! req_header.request_api_key = ApiKey::ApiVersions as i16;
49//! req_header.client_id = Some(StrBytes::from_static_str("example"));
50//! req_header.encode(&mut buf, ApiVersionsRequest::header_version(req_header.request_api_version)).unwrap();
51//! let mut api_versions_req = ApiVersionsRequest::default();
52//! api_versions_req.client_software_version = StrBytes::from_static_str("1.0");
53//! api_versions_req.client_software_name = StrBytes::from_static_str("example-client");
54//! api_versions_req.encode(&mut buf, req_header.request_api_version);
55//!
56//! # fn send_request(buf: &[u8]) -> Result<(), Box<dyn Error>> {
57//! # Ok(())
58//! # }
59//!
60//! // implemented elsewhere
61//! send_request(&buf[..]);
62//! ```
63//!
64//! ## Deserializing an Unknown Request
65//!
66//! The [`messages`] module provides the enums [`messages::RequestKind`], [`messages::ResponseKind`],
67//! and [`messages::ApiKey`] for matching on unknown requests and responses.
68//!
69//! A simple example for decoding an unknown message encoded in `buf`:
70//! ```rust
71//! use kafka_protocol::messages::{RequestHeader, ApiVersionsRequest, ApiKey, RequestKind};
72//! use kafka_protocol::protocol::{Encodable, Decodable, StrBytes, HeaderVersion};
73//! use bytes::{BytesMut, Buf};
74//! use std::convert::TryFrom;
75//! use kafka_protocol::protocol::buf::ByteBuf;
76//! # let mut buf = BytesMut::new();
77//! # let mut req_header = RequestHeader::default();
78//! # req_header.request_api_version = 3;
79//! # req_header.request_api_key = ApiKey::ApiVersions as i16;
80//! # req_header.client_id = Some(StrBytes::from_static_str("example"));
81//! # req_header.encode(&mut buf, ApiVersionsRequest::header_version(req_header.request_api_version)).unwrap();
82//! # let mut api_versions_req = ApiVersionsRequest::default();
83//! # api_versions_req.client_software_version = StrBytes::from_static_str("1.0");
84//! # api_versions_req.client_software_name = StrBytes::from_static_str("example-client");
85//! # api_versions_req.encode(&mut buf, 3);
86//!
87//! let api_key = buf.peek_bytes(0..2).get_i16();
88//! let api_version = buf.peek_bytes(2..4).get_i16();
89//! let header_version = ApiKey::try_from(api_key).unwrap().request_header_version(api_version);
90//!
91//! let header = RequestHeader::decode(&mut buf, header_version).unwrap();
92//! let api_key = ApiKey::try_from(header.request_api_version);
93//! let req = match api_key {
94//! ApiVersionsKey => RequestKind::ApiVersions(ApiVersionsRequest::decode(&mut buf, header.request_api_version).unwrap()),
95//! };
96//!
97//! // match on enum elsewhere and do work
98//! match req {
99//! RequestKind::ApiVersions(req) => {
100//! assert_eq!(req.client_software_name.to_string(), "example-client".to_string());
101//! }
102//! _ => panic!()
103//! }
104//! ```
105#![deny(missing_docs)]
106// Display required features for items when rendering for docs.rs
107#![cfg_attr(docsrs, feature(doc_auto_cfg))]
108
109pub mod compression;
110pub mod error;
111#[allow(clippy::all)]
112pub mod messages;
113pub mod protocol;
114pub mod records;
115
116pub use error::ResponseError;
117pub use indexmap;