Crate kafka_protocol
source ·Expand description
Implementation of the Kafka wire protocol in Rust.
This library follows the approach of the Kafka project and generates the protocol through schema defined in JSON. This ensures not only compatibility, but easy synchronization when new features are added upstream.
The goal of this project is to provide 100% coverage of the Kafka API, as well as basic utilities for working with the protocol, including compression and record serialization.
Messages
The messages module contains the generated request and response structs. These structs allow
easy serialization and deserialization via bytes::Bytes.
use bytes::{Bytes, BytesMut};
use kafka_protocol::messages::RequestHeader;
use kafka_protocol::protocol::{StrBytes, Encodable, Decodable};
let mut request_header = RequestHeader::default();
request_header.correlation_id = 1;
request_header.client_id = Some(StrBytes::from_str("test-client"));
let mut buf = BytesMut::new();
request_header.encode(&mut buf, 3);
assert_eq!(request_header, RequestHeader::decode(&mut buf, 3).unwrap());Note that every message implementation of Encodable::encode
and Decodable::decode requires  a version to be provided
explicitly. This is because every  message contains all the fields that are valid for every
version. These fields are not marked Option, which would represent nullability according
to a message’s schema, but rather have a default value that represents a nil value. It is the
user’s responsibility to ensure that only valid fields of the decoded message version are used.
Sending a Request
A request can be created by serializing a messages::RequestHeader and any given request
type to a bytes::Bytes buffer.
use kafka_protocol::protocol::{StrBytes, Encodable};
use bytes::{BytesMut, Bytes};
use kafka_protocol::messages::{RequestHeader, ApiKey, ApiVersionsRequest};
let mut buf = BytesMut::new();
let mut req_header = RequestHeader::default();
req_header.request_api_version = 3;
req_header.request_api_key = ApiKey::ApiVersionsKey as i16;
req_header.client_id = Some(StrBytes::from_str("example"));
req_header.encode(&mut buf, 2).unwrap();
let mut api_versions_req = ApiVersionsRequest::default();
api_versions_req.client_software_version = StrBytes::from_str("1.0");
api_versions_req.client_software_name = StrBytes::from_str("example-client");
api_versions_req.encode(&mut buf, 3);
// implemented elsewhere
send_request(&buf[..]);Deserializing an Unknown Request
The messages module provides the enums messages::RequestKind, messages::ResponseKind,
and messages::ApiKey for matching on unknown requests and responses.
A simple example for decoding an unknown message encoded in buf:
use kafka_protocol::messages::{RequestHeader, ApiVersionsRequest, ApiKey, RequestKind};
use kafka_protocol::protocol::{Encodable, Decodable, StrBytes};
use bytes::{BytesMut, Buf};
use std::convert::TryFrom;
use kafka_protocol::protocol::buf::ByteBuf;
// version is the second field in request header
let version = buf.peek_bytes(2..4).get_i16();
let header = RequestHeader::decode(&mut buf, version).unwrap();
let api_key = ApiKey::try_from(header.request_api_version);
let req = match api_key {
    ApiVersionsKey => RequestKind::ApiVersionsRequest(ApiVersionsRequest::decode(&mut buf, header.request_api_version).unwrap()),
};
// match on enum elsewhere and do work
match req {
    RequestKind::ApiVersionsRequest(req) => {
        assert_eq!(req.client_software_name.to_string(), "example-client".to_string());
    }
    _ => panic!()
}Re-exports
- pub use error::ResponseError;
Modules
- Provides compression utilities for encoding records.
- Error kinds for Kafka error codes.
- Messages used by the Kafka protocol.
- Most types are used internally in encoding/decoding, and are not required by typical use cases for interacting with the protocol. However, types can be used for decoding partial messages, or rewriting parts of an encoded message.
- Provides utilities for working with records (Kafka messages).