kafka_protocol/lib.rs
1//! Implementation of [the Kafka wire protocol](https://kafka.apache.org/protocol.html) in Rust.
2//!
3//! This library follows the approach of the Kafka project and generates the protocol through
4//! [schema defined in JSON](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message).
5//! This ensures not only compatibility, but easy synchronization when new features are added
6//! upstream.
7//!
8//! The goal of this project is to provide 100% coverage of the Kafka API, as well as basic
9//! utilities for working with the protocol, including compression and record serialization.
10//!
11//! # Messages
12//! The [`messages`] module contains the generated request and response structs. These structs allow
13//! easy serialization and deserialization via [`bytes::Bytes`].
14//!
15//! ```rust
16//! use bytes::BytesMut;
17//! use kafka_protocol::protocol::{encode_request_header_into_buffer, decode_request_header_from_buffer};
18//! use kafka_protocol::messages::RequestHeader;
19//! use kafka_protocol::protocol::{StrBytes, Encodable, Decodable};
20//!
21//! let mut request_header = RequestHeader::default();
22//! request_header.correlation_id = 1;
23//! request_header.client_id = Some(StrBytes::from_static_str("test-client"));
24//! let mut buf = BytesMut::new();
25//! encode_request_header_into_buffer(&mut buf, &request_header).unwrap();
26//! assert_eq!(request_header, decode_request_header_from_buffer(&mut buf).unwrap());
27//! ```
28//! Note that every message implementation of [`Encodable::encode`](crate::protocol::Encodable::encode)
29//! and [`Decodable::decode`](crate::protocol::Decodable::decode) requires a version to be provided
30//! explicitly. This is because every message contains *all* the fields that are valid for every
31//! version. These fields are *not* marked [`Option`], which would represent nullability according
32//! to a message's schema, but rather have a default value that represents a nil value. It is the
33//! user's responsibility to ensure that only valid fields of the decoded message version are used.
34//!
35//! ## Sending a Request
36//!
37//! A request can be created by serializing a [`messages::RequestHeader`] and any given request
38//! type to a [`bytes::Bytes`] buffer.
39//!
40//! ```rust
41//! use kafka_protocol::protocol::{StrBytes, Encodable, HeaderVersion, encode_request_header_into_buffer};
42//! use bytes::{BytesMut, Bytes};
43//! use kafka_protocol::messages::{RequestHeader, ApiKey, ApiVersionsRequest};
44//! # use std::error::Error;
45//!
46//! let mut buf = BytesMut::new();
47//! let mut req_header = RequestHeader::default();
48//! req_header.request_api_version = 3;
49//! req_header.request_api_key = ApiKey::ApiVersions as i16;
50//! req_header.client_id = Some(StrBytes::from_static_str("example"));
51//! encode_request_header_into_buffer(&mut buf, &req_header).unwrap();
52//! let mut api_versions_req = ApiVersionsRequest::default();
53//! api_versions_req.client_software_version = StrBytes::from_static_str("1.0");
54//! api_versions_req.client_software_name = StrBytes::from_static_str("example-client");
55//! api_versions_req.encode(&mut buf, req_header.request_api_version);
56//!
57//! # fn send_request(buf: &[u8]) -> Result<(), Box<dyn Error>> {
58//! # Ok(())
59//! # }
60//!
61//! // implemented elsewhere
62//! send_request(&buf[..]);
63//! ```
64//!
65//! ## Deserializing an Unknown Request
66//!
67//! The [`messages`] module provides the enums [`messages::RequestKind`], [`messages::ResponseKind`],
68//! and [`messages::ApiKey`] for matching on unknown requests and responses.
69//!
70//! A simple example for decoding an unknown message encoded in `buf`:
71//! ```rust
72//! use kafka_protocol::messages::{RequestHeader, ApiVersionsRequest, ApiKey, RequestKind};
73//! use kafka_protocol::protocol::{Encodable, Decodable, StrBytes, HeaderVersion, decode_request_header_from_buffer, encode_request_header_into_buffer};
74//! use bytes::{BytesMut, Buf};
75//! use std::convert::TryFrom;
76//! use kafka_protocol::protocol::buf::ByteBuf;
77//! # let mut buf = BytesMut::new();
78//! # let mut req_header = RequestHeader::default();
79//! # req_header.request_api_version = 3;
80//! # req_header.request_api_key = ApiKey::ApiVersions as i16;
81//! # req_header.client_id = Some(StrBytes::from_static_str("example"));
82//! # encode_request_header_into_buffer(&mut buf, &req_header).unwrap();
83//! # let mut api_versions_req = ApiVersionsRequest::default();
84//! # api_versions_req.client_software_version = StrBytes::from_static_str("1.0");
85//! # api_versions_req.client_software_name = StrBytes::from_static_str("example-client");
86//! # api_versions_req.encode(&mut buf, 3).unwrap();
87//!
88//!
89//! let header = decode_request_header_from_buffer(&mut buf).unwrap();
90//! let api_key = ApiKey::try_from(header.request_api_version);
91//! let req = match api_key {
92//! ApiVersionsKey => RequestKind::ApiVersions(ApiVersionsRequest::decode(&mut buf, header.request_api_version).unwrap()),
93//! };
94//!
95//! // match on enum elsewhere and do work
96//! match req {
97//! RequestKind::ApiVersions(req) => {
98//! assert_eq!(req.client_software_name.to_string(), "example-client".to_string());
99//! }
100//! _ => panic!()
101//! }
102//! ```
103#![deny(missing_docs)]
104// Display required features for items when rendering for docs.rs
105#![cfg_attr(docsrs, feature(doc_auto_cfg))]
106
107pub mod compression;
108pub mod error;
109#[allow(clippy::all)]
110pub mod messages;
111pub mod protocol;
112pub mod records;
113
114pub use error::ResponseError;
115pub use indexmap;