Crate kafka_wire_protocol

Source
Expand description

This library is a generated implementation of Apache Kafka wire protocol.

The library contains the protocol messages and some convenience code, but this is not a complete client (or server) implementation.

The message structs and the corresponding serialization and deserialization code are generated based on the Kafka JSON definitions.

Message versions up to Kafka 3.9.0 are supported at the moment. The unsupported message types are ProduceRequest, FetchResponse, FetchSnapshotResponse, and ShareFetchResponse.

§Message schema

Kafka has a number of request-response messages with versions. In the schema module you can find particular message types of particular versions.

§Usage

§Serialization

Writable is implemented for each message structure, so they have the write function. This function takes an Write instance, for example, Cursor.

use std::io::Cursor;
use kafka_wire_protocol::readable_writable::Writable;
use kafka_wire_protocol::schema::api_versions_request::v3::ApiVersionsRequest;

let mut request_cur: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let request = ApiVersionsRequest::new("client", "123");
request.write(&mut request_cur).unwrap();

§Deserialization

Readable is implemented for each message structure, so they have the read function. This function takes an Read instance, for example, Cursor.

use std::io::Cursor;
use kafka_wire_protocol::readable_writable::Readable;
use kafka_wire_protocol::schema::api_versions_response::v3::ApiVersionsResponse;

let mut response_buf = vec![0; 100];  // real buffer comes e.g. from network
let mut response_cur = Cursor::new(response_buf);
let response = ApiVersionsResponse::read(&mut response_cur).unwrap();

§Request and response header versions

ApiMessageType is an utility struct that helps programmatically get the versions of request and response headers required for the particular message type and version.

use kafka_wire_protocol::api_message_type::ApiMessageType;

let api_message_type: ApiMessageType = ApiMessageType::from_api_key(0);
assert_eq!(api_message_type.request_header_version(1), 1);
assert_eq!(api_message_type.response_header_version(1), 0);

§Example

use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use kafka_wire_protocol::api_message_type::ApiMessageType;
use kafka_wire_protocol::markers::ApiMessage;
use kafka_wire_protocol::readable_writable::{Readable, Writable};
use kafka_wire_protocol::schema::api_versions_request::v3::ApiVersionsRequest;
use kafka_wire_protocol::schema::api_versions_response::v3::ApiVersionsResponse;
use kafka_wire_protocol::schema::request_header::v2::RequestHeader;
use kafka_wire_protocol::schema::response_header::v0::ResponseHeader;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::net::{Shutdown, TcpStream};

// Connect to Kafka.
let mut tcp_stream = TcpStream::connect("127.0.0.1:9092").unwrap();

// Prepare the request buffer.
let mut request_cur: Cursor<Vec<u8>> = Cursor::<Vec<u8>>::new(Vec::new());
request_cur.write_i32::<BigEndian>(0).unwrap(); // size placeholder

let request = ApiVersionsRequest::new("client", "123");
let api_message_type: ApiMessageType = ApiMessageType::from_api_key(request.api_key());

let request_header_version = api_message_type.request_header_version(request.version());
println!("Request header version: {}", request_header_version);
assert_eq!(request_header_version, 2);

let response_header_version = api_message_type.response_header_version(request.version());
println!("Request header version: {}", response_header_version);
assert_eq!(response_header_version, 0);

// Write the header.
let correlation_id = 1234;
let client_id = "test-client";
let request_header = RequestHeader::new(
    request.api_key(),
    request.version(),
    correlation_id,
    Some(client_id),
);
request_header.write(&mut request_cur).unwrap();

// Write the request.
request.write(&mut request_cur).unwrap();

// Write the real size on top of the placeholder.
let size = (request_cur.position() - 4) as i32;
request_cur.seek(SeekFrom::Start(0)).unwrap();
request_cur.write_i32::<BigEndian>(size).unwrap();

// Send the request to Kafka.
tcp_stream.write(request_cur.get_ref()).unwrap();
tcp_stream.flush().unwrap();

// Read the response from the socket.
let response_size = tcp_stream.read_i32::<BigEndian>().unwrap() as usize;
let mut response_buf = vec![0; response_size];
let read_size = tcp_stream.read(&mut response_buf).unwrap();
assert_eq!(read_size, response_size);
let mut response_cur = Cursor::new(response_buf);

// Read the response header.
let response_header = ResponseHeader::read(&mut response_cur).unwrap();
assert_eq!(response_header.correlation_id, correlation_id);
println!("{:?}", response_header);

// Read the response.
let response = ApiVersionsResponse::read(&mut response_cur).unwrap();
println!("{:?}", response);

// Close the socket.
let _ = tcp_stream.shutdown(Shutdown::Both);

Modules§

api_message_type
markers
readable_writable
schema
tagged_fields