Expand description
A Kafka protocol implementation that performs no I/O (it operates only on bytes)
§Design
Apache Kafka defines each API message with a JSON message descriptor. Each descriptor contains a list of fields together with their associated type. Each field can include a range of versions for which it is valid, its encoding and whether it includes tagged fields. Further background on the protocol and implementation used are in the Apache Kafka protocol with serde, quote, syn and proc_macro2 article.
Some useful starting points:
- Data Structures -
Frame,Request,Response,HeaderandBody. - Producing or fetching messages -
record,ProduceRequestandFetchRequest
§Examples
Encoding a CreateTopicsRequest request:
use tansu_sans_io::{
ApiKey as _, CreateTopicsRequest, Frame, Header,
create_topics_request::{CreatableTopic, CreatableTopicConfig},
};
let header = Header::Request {
api_key: CreateTopicsRequest::KEY,
api_version: 7,
correlation_id: 298,
client_id: Some("adminclient-1".into()),
};
let body = CreateTopicsRequest::default()
.topics(Some(
[CreatableTopic::default()
.name("balances".into())
.num_partitions(-1)
.replication_factor(-1)
.assignments(Some([].into()))
.configs(Some(
[CreatableTopicConfig::default()
.name("cleanup.policy".into())
.value(Some("compact".into()))]
.into(),
))]
.into(),
))
.timeout_ms(30_000)
.validate_only(Some(false))
.into();
let encoded = Frame::request(header, body)?;Decoding a FindCoordinatorRequest:
use tansu_sans_io::{ApiKey as _, FindCoordinatorRequest, Frame, Header};
let encoded = vec![
0, 0, 0, 50, 0, 10, 0, 4, 0, 0, 0, 0, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 99, 111,
110, 115, 117, 109, 101, 114, 0, 0, 2, 20, 116, 101, 115, 116, 45, 99, 111, 110, 115, 117,
109, 101, 114, 45, 103, 114, 111, 117, 112, 0,
];
assert_eq!(
Frame {
size: 50,
header: Header::Request {
api_key: FindCoordinatorRequest::KEY,
api_version: 4,
correlation_id: 0,
client_id: Some("console-consumer".into())
},
body: FindCoordinatorRequest::default()
.key(None)
.key_type(Some(0))
.coordinator_keys(Some(["test-consumer-group".into()].into()))
.into()
},
Frame::request_from_bytes(&encoded[..])?
);This crate includes a build time proc macro that generates simple Rust structures
containing all the fields present in the the Kafka message descriptor. Each generated
type implements serde::Serialize and serde::Deserialize traits. As part of
the generation phase MESSAGE_META is created, which is used by the actual message serializers.
The Kafka protocol is implemented by ser::Encoder and de::Decoder,
using MESSAGE_META to determine which fields are present, their serialization type
and whether any tagged fields can be present for a particular message version. Serializers
map from the Serde Data Model to the Kafka protocol or vice versa.
Re-exports§
pub use de::Decoder;pub use ser::Encoder;pub use add_offsets_to_txn_request::AddOffsetsToTxnRequest;pub use add_offsets_to_txn_response::AddOffsetsToTxnResponse;pub use add_partitions_to_txn_request::AddPartitionsToTxnRequest;pub use add_partitions_to_txn_response::AddPartitionsToTxnResponse;pub use add_raft_voter_request::AddRaftVoterRequest;pub use add_raft_voter_response::AddRaftVoterResponse;pub use alter_client_quotas_request::AlterClientQuotasRequest;pub use alter_client_quotas_response::AlterClientQuotasResponse;pub use alter_configs_request::AlterConfigsRequest;pub use alter_configs_response::AlterConfigsResponse;pub use alter_partition_reassignments_request::AlterPartitionReassignmentsRequest;pub use alter_partition_reassignments_response::AlterPartitionReassignmentsResponse;pub use alter_replica_log_dirs_request::AlterReplicaLogDirsRequest;pub use alter_replica_log_dirs_response::AlterReplicaLogDirsResponse;pub use alter_user_scram_credentials_request::AlterUserScramCredentialsRequest;pub use alter_user_scram_credentials_response::AlterUserScramCredentialsResponse;pub use api_versions_request::ApiVersionsRequest;pub use api_versions_response::ApiVersionsResponse;pub use consumer_group_describe_request::ConsumerGroupDescribeRequest;pub use consumer_group_describe_response::ConsumerGroupDescribeResponse;pub use consumer_group_heartbeat_request::ConsumerGroupHeartbeatRequest;pub use consumer_group_heartbeat_response::ConsumerGroupHeartbeatResponse;pub use create_acls_request::CreateAclsRequest;pub use create_acls_response::CreateAclsResponse;pub use create_delegation_token_request::CreateDelegationTokenRequest;pub use create_delegation_token_response::CreateDelegationTokenResponse;pub use create_partitions_request::CreatePartitionsRequest;pub use create_partitions_response::CreatePartitionsResponse;pub use create_topics_request::CreateTopicsRequest;pub use create_topics_response::CreateTopicsResponse;pub use delete_acls_request::DeleteAclsRequest;pub use delete_acls_response::DeleteAclsResponse;pub use delete_groups_request::DeleteGroupsRequest;pub use delete_groups_response::DeleteGroupsResponse;pub use delete_records_request::DeleteRecordsRequest;pub use delete_records_response::DeleteRecordsResponse;pub use delete_topics_request::DeleteTopicsRequest;pub use delete_topics_response::DeleteTopicsResponse;pub use describe_acls_request::DescribeAclsRequest;pub use describe_acls_response::DescribeAclsResponse;pub use describe_client_quotas_request::DescribeClientQuotasRequest;pub use describe_client_quotas_response::DescribeClientQuotasResponse;pub use describe_cluster_request::DescribeClusterRequest;pub use describe_cluster_response::DescribeClusterResponse;pub use describe_configs_request::DescribeConfigsRequest;pub use describe_configs_response::DescribeConfigsResponse;pub use describe_delegation_token_request::DescribeDelegationTokenRequest;pub use describe_delegation_token_response::DescribeDelegationTokenResponse;pub use describe_groups_request::DescribeGroupsRequest;pub use describe_groups_response::DescribeGroupsResponse;pub use describe_log_dirs_request::DescribeLogDirsRequest;pub use describe_log_dirs_response::DescribeLogDirsResponse;pub use describe_producers_request::DescribeProducersRequest;pub use describe_producers_response::DescribeProducersResponse;pub use describe_quorum_request::DescribeQuorumRequest;pub use describe_quorum_response::DescribeQuorumResponse;pub use describe_topic_partitions_request::DescribeTopicPartitionsRequest;pub use describe_topic_partitions_response::DescribeTopicPartitionsResponse;pub use describe_transactions_request::DescribeTransactionsRequest;pub use describe_transactions_response::DescribeTransactionsResponse;pub use describe_user_scram_credentials_request::DescribeUserScramCredentialsRequest;pub use describe_user_scram_credentials_response::DescribeUserScramCredentialsResponse;pub use elect_leaders_request::ElectLeadersRequest;pub use elect_leaders_response::ElectLeadersResponse;pub use end_txn_request::EndTxnRequest;pub use end_txn_response::EndTxnResponse;pub use expire_delegation_token_request::ExpireDelegationTokenRequest;pub use expire_delegation_token_response::ExpireDelegationTokenResponse;pub use fetch_request::FetchRequest;pub use fetch_response::FetchResponse;pub use find_coordinator_request::FindCoordinatorRequest;pub use find_coordinator_response::FindCoordinatorResponse;pub use get_telemetry_subscriptions_request::GetTelemetrySubscriptionsRequest;pub use get_telemetry_subscriptions_response::GetTelemetrySubscriptionsResponse;pub use heartbeat_request::HeartbeatRequest;pub use heartbeat_response::HeartbeatResponse;pub use incremental_alter_configs_request::IncrementalAlterConfigsRequest;pub use incremental_alter_configs_response::IncrementalAlterConfigsResponse;pub use init_producer_id_request::InitProducerIdRequest;pub use init_producer_id_response::InitProducerIdResponse;pub use join_group_request::JoinGroupRequest;pub use join_group_response::JoinGroupResponse;pub use leave_group_request::LeaveGroupRequest;pub use leave_group_response::LeaveGroupResponse;pub use list_client_metrics_resources_response::ListClientMetricsResourcesResponse;pub use list_groups_request::ListGroupsRequest;pub use list_groups_response::ListGroupsResponse;pub use list_offsets_request::ListOffsetsRequest;pub use list_offsets_response::ListOffsetsResponse;pub use list_partition_reassignments_request::ListPartitionReassignmentsRequest;pub use list_partition_reassignments_response::ListPartitionReassignmentsResponse;pub use list_transactions_request::ListTransactionsRequest;pub use list_transactions_response::ListTransactionsResponse;pub use metadata_request::MetadataRequest;pub use metadata_response::MetadataResponse;pub use offset_commit_request::OffsetCommitRequest;pub use offset_commit_response::OffsetCommitResponse;pub use offset_delete_request::OffsetDeleteRequest;pub use offset_delete_response::OffsetDeleteResponse;pub use offset_fetch_request::OffsetFetchRequest;pub use offset_fetch_response::OffsetFetchResponse;pub use offset_for_leader_epoch_request::OffsetForLeaderEpochRequest;pub use offset_for_leader_epoch_response::OffsetForLeaderEpochResponse;pub use produce_request::ProduceRequest;pub use produce_response::ProduceResponse;pub use push_telemetry_request::PushTelemetryRequest;pub use push_telemetry_response::PushTelemetryResponse;pub use remove_raft_voter_request::RemoveRaftVoterRequest;pub use remove_raft_voter_response::RemoveRaftVoterResponse;pub use renew_delegation_token_request::RenewDelegationTokenRequest;pub use renew_delegation_token_response::RenewDelegationTokenResponse;pub use sasl_authenticate_request::SaslAuthenticateRequest;pub use sasl_authenticate_response::SaslAuthenticateResponse;pub use sasl_handshake_request::SaslHandshakeRequest;pub use sasl_handshake_response::SaslHandshakeResponse;pub use sync_group_request::SyncGroupRequest;pub use sync_group_response::SyncGroupResponse;pub use txn_offset_commit_request::TxnOffsetCommitRequest;pub use txn_offset_commit_response::TxnOffsetCommitResponse;pub use unregister_broker_request::UnregisterBrokerRequest;pub use unregister_broker_response::UnregisterBrokerResponse;pub use update_features_request::UpdateFeaturesRequest;pub use update_features_response::UpdateFeaturesResponse;pub use write_txn_markers_request::WriteTxnMarkersRequest;pub use write_txn_markers_response::WriteTxnMarkersResponse;
Modules§
- acl
- add_
offsets_ to_ txn_ request - add_
offsets_ to_ txn_ response - add_
partitions_ to_ txn_ request - add_
partitions_ to_ txn_ response - add_
raft_ voter_ request - add_
raft_ voter_ response - alter_
client_ quotas_ request - alter_
client_ quotas_ response - alter_
configs_ request - alter_
configs_ response - alter_
partition_ reassignments_ request - alter_
partition_ reassignments_ response - alter_
replica_ log_ dirs_ request - alter_
replica_ log_ dirs_ response - alter_
user_ scram_ credentials_ request - alter_
user_ scram_ credentials_ response - api_
versions_ request - api_
versions_ response - consumer_
group_ describe_ request - consumer_
group_ describe_ response - consumer_
group_ heartbeat_ request - consumer_
group_ heartbeat_ response - create_
acls_ request - create_
acls_ response - create_
delegation_ token_ request - create_
delegation_ token_ response - create_
partitions_ request - create_
partitions_ response - create_
topics_ request - create_
topics_ response - de
- delete_
acls_ request - delete_
acls_ response - delete_
groups_ request - delete_
groups_ response - delete_
records_ request - delete_
records_ response - delete_
share_ group_ state_ request - delete_
share_ group_ state_ response - delete_
topics_ request - delete_
topics_ response - describe_
acls_ request - describe_
acls_ response - describe_
client_ quotas_ request - describe_
client_ quotas_ response - describe_
cluster_ request - describe_
cluster_ response - describe_
configs_ request - describe_
configs_ response - describe_
delegation_ token_ request - describe_
delegation_ token_ response - describe_
groups_ request - describe_
groups_ response - describe_
log_ dirs_ request - describe_
log_ dirs_ response - describe_
producers_ request - describe_
producers_ response - describe_
quorum_ request - describe_
quorum_ response - describe_
topic_ partitions_ request - describe_
topic_ partitions_ response - describe_
transactions_ request - describe_
transactions_ response - describe_
user_ scram_ credentials_ request - describe_
user_ scram_ credentials_ response - elect_
leaders_ request - elect_
leaders_ response - end_
txn_ request - end_
txn_ response - expire_
delegation_ token_ request - expire_
delegation_ token_ response - fetch_
request - fetch_
response - find_
coordinator_ request - find_
coordinator_ response - get_
telemetry_ subscriptions_ request - get_
telemetry_ subscriptions_ response - heartbeat_
request - heartbeat_
response - incremental_
alter_ configs_ request - incremental_
alter_ configs_ response - init_
producer_ id_ request - init_
producer_ id_ response - initialize_
share_ group_ state_ request - initialize_
share_ group_ state_ response - join_
group_ request - join_
group_ response - leave_
group_ request - leave_
group_ response - list_
client_ metrics_ resources_ response - list_
groups_ request - list_
groups_ response - list_
offsets_ request - list_
offsets_ response - list_
partition_ reassignments_ request - list_
partition_ reassignments_ response - list_
transactions_ request - list_
transactions_ response - metadata_
request - metadata_
response - offset_
commit_ request - offset_
commit_ response - offset_
delete_ request - offset_
delete_ response - offset_
fetch_ request - offset_
fetch_ response - offset_
for_ leader_ epoch_ request - offset_
for_ leader_ epoch_ response - primitive
- produce_
request - produce_
response - push_
telemetry_ request - push_
telemetry_ response - read_
share_ group_ state_ request - read_
share_ group_ state_ response - read_
share_ group_ state_ summary_ request - read_
share_ group_ state_ summary_ response - record
- Kafka Record
- remove_
raft_ voter_ request - remove_
raft_ voter_ response - renew_
delegation_ token_ request - renew_
delegation_ token_ response - resource
- sasl_
authenticate_ request - sasl_
authenticate_ response - sasl_
handshake_ request - sasl_
handshake_ response - ser
- share_
acknowledge_ request - share_
acknowledge_ response - share_
fetch_ request - share_
fetch_ response - share_
group_ describe_ request - share_
group_ describe_ response - share_
group_ heartbeat_ request - share_
group_ heartbeat_ response - sync_
group_ request - sync_
group_ response - txn_
offset_ commit_ request - txn_
offset_ commit_ response - unregister_
broker_ request - unregister_
broker_ response - update_
features_ request - update_
features_ response - write_
share_ group_ state_ request - write_
share_ group_ state_ response - write_
txn_ markers_ request - write_
txn_ markers_ response
Structs§
- Batch
Attribute - The produce batch attributes.
- Control
Batch - The control batch marker.
- EndTransaction
Marker - An end transaction marker.
- Frame
- A Kafka API frame prefixed with its length, followed by a header and the message body.
- Root
Message Meta
Enums§
- Ack
- Produce message acknowledgement.
- Body
- A Kafka API request or response message body.
- Compression
- Kafka message compression types.
- Config
Resource - What type of resource is the configuration describing.
- Config
Source - From which source was the configuration provided.
- Config
Type - The type of configuration.
- Coordinator
Type - The coordinator type.
- Endpoint
Type - The endpoint type.
- Error
- Error
Code - Kafka API response error codes.
- Header
- A Kafka API request or response header.
- Isolation
Level - The fetch isolation level.
- List
Offset - List Offset
- OpType
- The configuration operation type.
- Scram
Mechanism - Timestamp
Type - The timestamp type.
Constants§
- NULL_
TOPIC_ ID - The null topic identifier.
Statics§
Traits§
- ApiKey
- ApiName
- Decode
- Encode
- Request
- All Kafka API requests implement this trait
- Response
- All Kafka API responses implement this trait
Functions§
- to_
system_ time - convert a Kafka timestamp into system time
- to_
timestamp - convert system time into a kafka timestamp