Skip to main content

Crate tansu_sans_io

Crate tansu_sans_io 

Source
Expand description

A Kafka protocol implementation that performs no I/O (it operates only on bytes)

§Design

Apache Kafka defines each API message with a JSON message descriptor. Each descriptor contains a list of fields together with their associated type. Each field can include a range of versions for which it is valid, its encoding and whether it includes tagged fields. Further background on the protocol and implementation used are in the Apache Kafka protocol with serde, quote, syn and proc_macro2 article.

Some useful starting points:

§Examples

Encoding a CreateTopicsRequest request:

use tansu_sans_io::{
    ApiKey as _, CreateTopicsRequest, Frame, Header,
    create_topics_request::{CreatableTopic, CreatableTopicConfig},
};

let header = Header::Request {
    api_key: CreateTopicsRequest::KEY,
    api_version: 7,
    correlation_id: 298,
    client_id: Some("adminclient-1".into()),
};

let body = CreateTopicsRequest::default()
    .topics(Some(
        [CreatableTopic::default()
            .name("balances".into())
            .num_partitions(-1)
            .replication_factor(-1)
            .assignments(Some([].into()))
            .configs(Some(
                [CreatableTopicConfig::default()
                    .name("cleanup.policy".into())
                    .value(Some("compact".into()))]
                .into(),
            ))]
        .into(),
    ))
    .timeout_ms(30_000)
    .validate_only(Some(false))
    .into();

let encoded = Frame::request(header, body)?;

Decoding a FindCoordinatorRequest:

use tansu_sans_io::{ApiKey as _, FindCoordinatorRequest, Frame, Header};

let encoded = vec![
    0, 0, 0, 50, 0, 10, 0, 4, 0, 0, 0, 0, 0, 16, 99, 111, 110, 115, 111, 108, 101, 45, 99, 111,
    110, 115, 117, 109, 101, 114, 0, 0, 2, 20, 116, 101, 115, 116, 45, 99, 111, 110, 115, 117,
    109, 101, 114, 45, 103, 114, 111, 117, 112, 0,
];

assert_eq!(
    Frame {
        size: 50,
        header: Header::Request {
            api_key: FindCoordinatorRequest::KEY,
            api_version: 4,
            correlation_id: 0,
            client_id: Some("console-consumer".into())
        },
        body: FindCoordinatorRequest::default()
            .key(None)
            .key_type(Some(0))
            .coordinator_keys(Some(["test-consumer-group".into()].into()))
            .into()
    },
    Frame::request_from_bytes(&encoded[..])?
);

This crate includes a build time proc macro that generates simple Rust structures containing all the fields present in the the Kafka message descriptor. Each generated type implements serde::Serialize and serde::Deserialize traits. As part of the generation phase MESSAGE_META is created, which is used by the actual message serializers.

The Kafka protocol is implemented by ser::Encoder and de::Decoder, using MESSAGE_META to determine which fields are present, their serialization type and whether any tagged fields can be present for a particular message version. Serializers map from the Serde Data Model to the Kafka protocol or vice versa.

Re-exports§

pub use de::Decoder;
pub use ser::Encoder;
pub use add_offsets_to_txn_request::AddOffsetsToTxnRequest;
pub use add_offsets_to_txn_response::AddOffsetsToTxnResponse;
pub use add_partitions_to_txn_request::AddPartitionsToTxnRequest;
pub use add_partitions_to_txn_response::AddPartitionsToTxnResponse;
pub use add_raft_voter_request::AddRaftVoterRequest;
pub use add_raft_voter_response::AddRaftVoterResponse;
pub use alter_client_quotas_request::AlterClientQuotasRequest;
pub use alter_client_quotas_response::AlterClientQuotasResponse;
pub use alter_configs_request::AlterConfigsRequest;
pub use alter_configs_response::AlterConfigsResponse;
pub use alter_partition_reassignments_request::AlterPartitionReassignmentsRequest;
pub use alter_partition_reassignments_response::AlterPartitionReassignmentsResponse;
pub use alter_replica_log_dirs_request::AlterReplicaLogDirsRequest;
pub use alter_replica_log_dirs_response::AlterReplicaLogDirsResponse;
pub use alter_user_scram_credentials_request::AlterUserScramCredentialsRequest;
pub use alter_user_scram_credentials_response::AlterUserScramCredentialsResponse;
pub use api_versions_request::ApiVersionsRequest;
pub use api_versions_response::ApiVersionsResponse;
pub use consumer_group_describe_request::ConsumerGroupDescribeRequest;
pub use consumer_group_describe_response::ConsumerGroupDescribeResponse;
pub use consumer_group_heartbeat_request::ConsumerGroupHeartbeatRequest;
pub use consumer_group_heartbeat_response::ConsumerGroupHeartbeatResponse;
pub use create_acls_request::CreateAclsRequest;
pub use create_acls_response::CreateAclsResponse;
pub use create_delegation_token_request::CreateDelegationTokenRequest;
pub use create_delegation_token_response::CreateDelegationTokenResponse;
pub use create_partitions_request::CreatePartitionsRequest;
pub use create_partitions_response::CreatePartitionsResponse;
pub use create_topics_request::CreateTopicsRequest;
pub use create_topics_response::CreateTopicsResponse;
pub use delete_acls_request::DeleteAclsRequest;
pub use delete_acls_response::DeleteAclsResponse;
pub use delete_groups_request::DeleteGroupsRequest;
pub use delete_groups_response::DeleteGroupsResponse;
pub use delete_records_request::DeleteRecordsRequest;
pub use delete_records_response::DeleteRecordsResponse;
pub use delete_share_group_state_request::DeleteShareGroupStateRequest;
pub use delete_share_group_state_response::DeleteShareGroupStateResponse;
pub use delete_topics_request::DeleteTopicsRequest;
pub use delete_topics_response::DeleteTopicsResponse;
pub use describe_acls_request::DescribeAclsRequest;
pub use describe_acls_response::DescribeAclsResponse;
pub use describe_client_quotas_request::DescribeClientQuotasRequest;
pub use describe_client_quotas_response::DescribeClientQuotasResponse;
pub use describe_cluster_request::DescribeClusterRequest;
pub use describe_cluster_response::DescribeClusterResponse;
pub use describe_configs_request::DescribeConfigsRequest;
pub use describe_configs_response::DescribeConfigsResponse;
pub use describe_delegation_token_request::DescribeDelegationTokenRequest;
pub use describe_delegation_token_response::DescribeDelegationTokenResponse;
pub use describe_groups_request::DescribeGroupsRequest;
pub use describe_groups_response::DescribeGroupsResponse;
pub use describe_log_dirs_request::DescribeLogDirsRequest;
pub use describe_log_dirs_response::DescribeLogDirsResponse;
pub use describe_producers_request::DescribeProducersRequest;
pub use describe_producers_response::DescribeProducersResponse;
pub use describe_quorum_request::DescribeQuorumRequest;
pub use describe_quorum_response::DescribeQuorumResponse;
pub use describe_topic_partitions_request::DescribeTopicPartitionsRequest;
pub use describe_topic_partitions_response::DescribeTopicPartitionsResponse;
pub use describe_transactions_request::DescribeTransactionsRequest;
pub use describe_transactions_response::DescribeTransactionsResponse;
pub use describe_user_scram_credentials_request::DescribeUserScramCredentialsRequest;
pub use describe_user_scram_credentials_response::DescribeUserScramCredentialsResponse;
pub use elect_leaders_request::ElectLeadersRequest;
pub use elect_leaders_response::ElectLeadersResponse;
pub use end_txn_request::EndTxnRequest;
pub use end_txn_response::EndTxnResponse;
pub use expire_delegation_token_request::ExpireDelegationTokenRequest;
pub use expire_delegation_token_response::ExpireDelegationTokenResponse;
pub use fetch_request::FetchRequest;
pub use fetch_response::FetchResponse;
pub use find_coordinator_request::FindCoordinatorRequest;
pub use find_coordinator_response::FindCoordinatorResponse;
pub use get_telemetry_subscriptions_request::GetTelemetrySubscriptionsRequest;
pub use get_telemetry_subscriptions_response::GetTelemetrySubscriptionsResponse;
pub use heartbeat_request::HeartbeatRequest;
pub use heartbeat_response::HeartbeatResponse;
pub use incremental_alter_configs_request::IncrementalAlterConfigsRequest;
pub use incremental_alter_configs_response::IncrementalAlterConfigsResponse;
pub use init_producer_id_request::InitProducerIdRequest;
pub use init_producer_id_response::InitProducerIdResponse;
pub use initialize_share_group_state_request::InitializeShareGroupStateRequest;
pub use initialize_share_group_state_response::InitializeShareGroupStateResponse;
pub use join_group_request::JoinGroupRequest;
pub use join_group_response::JoinGroupResponse;
pub use leave_group_request::LeaveGroupRequest;
pub use leave_group_response::LeaveGroupResponse;
pub use list_client_metrics_resources_response::ListClientMetricsResourcesResponse;
pub use list_groups_request::ListGroupsRequest;
pub use list_groups_response::ListGroupsResponse;
pub use list_offsets_request::ListOffsetsRequest;
pub use list_offsets_response::ListOffsetsResponse;
pub use list_partition_reassignments_request::ListPartitionReassignmentsRequest;
pub use list_partition_reassignments_response::ListPartitionReassignmentsResponse;
pub use list_transactions_request::ListTransactionsRequest;
pub use list_transactions_response::ListTransactionsResponse;
pub use metadata_request::MetadataRequest;
pub use metadata_response::MetadataResponse;
pub use offset_commit_request::OffsetCommitRequest;
pub use offset_commit_response::OffsetCommitResponse;
pub use offset_delete_request::OffsetDeleteRequest;
pub use offset_delete_response::OffsetDeleteResponse;
pub use offset_fetch_request::OffsetFetchRequest;
pub use offset_fetch_response::OffsetFetchResponse;
pub use offset_for_leader_epoch_request::OffsetForLeaderEpochRequest;
pub use offset_for_leader_epoch_response::OffsetForLeaderEpochResponse;
pub use produce_request::ProduceRequest;
pub use produce_response::ProduceResponse;
pub use push_telemetry_request::PushTelemetryRequest;
pub use push_telemetry_response::PushTelemetryResponse;
pub use read_share_group_state_request::ReadShareGroupStateRequest;
pub use read_share_group_state_response::ReadShareGroupStateResponse;
pub use read_share_group_state_summary_request::ReadShareGroupStateSummaryRequest;
pub use read_share_group_state_summary_response::ReadShareGroupStateSummaryResponse;
pub use remove_raft_voter_request::RemoveRaftVoterRequest;
pub use remove_raft_voter_response::RemoveRaftVoterResponse;
pub use renew_delegation_token_request::RenewDelegationTokenRequest;
pub use renew_delegation_token_response::RenewDelegationTokenResponse;
pub use sasl_authenticate_request::SaslAuthenticateRequest;
pub use sasl_authenticate_response::SaslAuthenticateResponse;
pub use sasl_handshake_request::SaslHandshakeRequest;
pub use sasl_handshake_response::SaslHandshakeResponse;
pub use share_acknowledge_request::ShareAcknowledgeRequest;
pub use share_acknowledge_response::ShareAcknowledgeResponse;
pub use share_fetch_request::ShareFetchRequest;
pub use share_fetch_response::ShareFetchResponse;
pub use share_group_describe_request::ShareGroupDescribeRequest;
pub use share_group_describe_response::ShareGroupDescribeResponse;
pub use share_group_heartbeat_request::ShareGroupHeartbeatRequest;
pub use share_group_heartbeat_response::ShareGroupHeartbeatResponse;
pub use sync_group_request::SyncGroupRequest;
pub use sync_group_response::SyncGroupResponse;
pub use txn_offset_commit_request::TxnOffsetCommitRequest;
pub use txn_offset_commit_response::TxnOffsetCommitResponse;
pub use unregister_broker_request::UnregisterBrokerRequest;
pub use unregister_broker_response::UnregisterBrokerResponse;
pub use update_features_request::UpdateFeaturesRequest;
pub use update_features_response::UpdateFeaturesResponse;
pub use write_share_group_state_request::WriteShareGroupStateRequest;
pub use write_share_group_state_response::WriteShareGroupStateResponse;
pub use write_txn_markers_request::WriteTxnMarkersRequest;
pub use write_txn_markers_response::WriteTxnMarkersResponse;

Modules§

acl
add_offsets_to_txn_request
add_offsets_to_txn_response
add_partitions_to_txn_request
add_partitions_to_txn_response
add_raft_voter_request
add_raft_voter_response
alter_client_quotas_request
alter_client_quotas_response
alter_configs_request
alter_configs_response
alter_partition_reassignments_request
alter_partition_reassignments_response
alter_replica_log_dirs_request
alter_replica_log_dirs_response
alter_user_scram_credentials_request
alter_user_scram_credentials_response
api_versions_request
api_versions_response
consumer_group_describe_request
consumer_group_describe_response
consumer_group_heartbeat_request
consumer_group_heartbeat_response
create_acls_request
create_acls_response
create_delegation_token_request
create_delegation_token_response
create_partitions_request
create_partitions_response
create_topics_request
create_topics_response
de
delete_acls_request
delete_acls_response
delete_groups_request
delete_groups_response
delete_records_request
delete_records_response
delete_share_group_state_request
delete_share_group_state_response
delete_topics_request
delete_topics_response
describe_acls_request
describe_acls_response
describe_client_quotas_request
describe_client_quotas_response
describe_cluster_request
describe_cluster_response
describe_configs_request
describe_configs_response
describe_delegation_token_request
describe_delegation_token_response
describe_groups_request
describe_groups_response
describe_log_dirs_request
describe_log_dirs_response
describe_producers_request
describe_producers_response
describe_quorum_request
describe_quorum_response
describe_topic_partitions_request
describe_topic_partitions_response
describe_transactions_request
describe_transactions_response
describe_user_scram_credentials_request
describe_user_scram_credentials_response
elect_leaders_request
elect_leaders_response
end_txn_request
end_txn_response
expire_delegation_token_request
expire_delegation_token_response
fetch_request
fetch_response
find_coordinator_request
find_coordinator_response
get_telemetry_subscriptions_request
get_telemetry_subscriptions_response
heartbeat_request
heartbeat_response
incremental_alter_configs_request
incremental_alter_configs_response
init_producer_id_request
init_producer_id_response
initialize_share_group_state_request
initialize_share_group_state_response
join_group_request
join_group_response
leave_group_request
leave_group_response
list_client_metrics_resources_response
list_groups_request
list_groups_response
list_offsets_request
list_offsets_response
list_partition_reassignments_request
list_partition_reassignments_response
list_transactions_request
list_transactions_response
metadata_request
metadata_response
offset_commit_request
offset_commit_response
offset_delete_request
offset_delete_response
offset_fetch_request
offset_fetch_response
offset_for_leader_epoch_request
offset_for_leader_epoch_response
primitive
produce_request
produce_response
push_telemetry_request
push_telemetry_response
read_share_group_state_request
read_share_group_state_response
read_share_group_state_summary_request
read_share_group_state_summary_response
record
Kafka Record
remove_raft_voter_request
remove_raft_voter_response
renew_delegation_token_request
renew_delegation_token_response
resource
sasl_authenticate_request
sasl_authenticate_response
sasl_handshake_request
sasl_handshake_response
ser
share_acknowledge_request
share_acknowledge_response
share_fetch_request
share_fetch_response
share_group_describe_request
share_group_describe_response
share_group_heartbeat_request
share_group_heartbeat_response
sync_group_request
sync_group_response
txn_offset_commit_request
txn_offset_commit_response
unregister_broker_request
unregister_broker_response
update_features_request
update_features_response
write_share_group_state_request
write_share_group_state_response
write_txn_markers_request
write_txn_markers_response

Structs§

BatchAttribute
The produce batch attributes.
ControlBatch
The control batch marker.
EndTransactionMarker
An end transaction marker.
Frame
A Kafka API frame prefixed with its length, followed by a header and the message body.
RootMessageMeta

Enums§

Ack
Produce message acknowledgement.
Body
A Kafka API request or response message body.
Compression
Kafka message compression types.
ConfigResource
What type of resource is the configuration describing.
ConfigSource
From which source was the configuration provided.
ConfigType
The type of configuration.
CoordinatorType
The coordinator type.
EndpointType
The endpoint type.
Error
ErrorCode
Kafka API response error codes.
Header
A Kafka API request or response header.
IsolationLevel
The fetch isolation level.
ListOffset
List Offset
OpType
The configuration operation type.
ScramMechanism
TimestampType
The timestamp type.

Constants§

NULL_TOPIC_ID
The null topic identifier.

Statics§

MESSAGE_META

Traits§

ApiKey
ApiName
Decode
Encode
Request
All Kafka API requests implement this trait
Response
All Kafka API responses implement this trait

Functions§

to_system_time
convert a Kafka timestamp into system time
to_timestamp
convert system time into a kafka timestamp

Type Aliases§

Result