kafka_threadpool/
lib.rs

1//! # Kafka Threadpool for Rust with mTLS Support
2//!
3//! An async rust threadpool for publishing messages to kafka using ``SSL`` (mTLS) or ``PLAINTEXT`` protocols.
4//!
5//! ## Architecture
6//!
7//! This is a work in progress. The architecture will likely change over time. For now here's the latest reference architecture:
8//!
9//! ![kafka-threadpool Reference Architecture v1](https://raw.githubusercontent.com/jay-johnson/rust-kafka-threadpool/main/images/kafka_threadpool_design_v1.png)
10//!
11//! ## Background
12//!
13//! Please refer to the [blog post](https://jaypjohnson.com/2022-09-19-designing-a-high-performance-rust-threadpool-for-kafka-with-mtls.html) for more information on this repo.
14//!
15//! ## Configuration
16//!
17//! ### Supported Environment Variables
18//!
19//! | Environment Variable Name        | Purpose / Value                                |
20//! | -------------------------------- | ---------------------------------------------- |
21//! | KAFKA_ENABLED                    | toggle the kafka_threadpool on with: ``true`` or ``1`` anything else disables the threadpool |
22//! | KAFKA_LOG_LABEL                  | tracking label that shows up in all crate logs |
23//! | KAFKA_BROKERS                    | comma-delimited list of brokers (``host1:port,host2:port,host3:port``) |
24//! | KAFKA_TOPICS                     | comma-delimited list of supported topics |
25//! | KAFKA_PUBLISH_RETRY_INTERVAL_SEC | number of seconds to sleep before each publish retry |
26//! | KAFKA_PUBLISH_IDLE_INTERVAL_SEC  | number of seconds to sleep if there are no message to process |
27//! | KAFKA_NUM_THREADS                | number of threads for the threadpool |
28//! | KAFKA_TLS_CLIENT_KEY             | optional - path to the kafka mTLS key |
29//! | KAFKA_TLS_CLIENT_CERT            | optional - path to the kafka mTLS certificate |
30//! | KAFKA_TLS_CLIENT_CA              | optional - path to the kafka mTLS certificate authority (CA) |
31//! | KAFKA_METADATA_COUNT_MSG_OFFSETS | optional - set to anything but ``true`` to bypass counting the offsets |
32//!
33//! ## Getting Started
34//!
35//! Please ensure your kafka cluster is running before starting. If you need help running a kafka cluster please refer to the [rust-with-strimzi-kafka-tls repo](https://github.com/jay-johnson/rust-with-strimzi-kafka-and-tls) for more details.
36//!
37//! ### Set up the Environment Variables
38//!
39//! You can create an ``./env/kafka.env`` file storing the environment variables to make your producer and consumer consistent (and ready for podman/docker or kubernetes):
40//!
41//! ```bash
42//! export KAFKA_ENABLED=1
43//! export KAFKA_LOG_LABEL="ktp"
44//! export KAFKA_BROKERS="host1:port,host2:port,host3:port"
45//! export KAFKA_TOPICS="testing"
46//! export KAFKA_PUBLISH_RETRY_INTERVAL_SEC="1.0"
47//! export KAFKA_NUM_THREADS="5"
48//! export KAFKA_TLS_CLIENT_CA="PATH_TO_TLS_CA_FILE"
49//! export KAFKA_TLS_CLIENT_CERT="PATH_TO_TLS_CERT_FILE"
50//! export KAFKA_TLS_CLIENT_KEY="PATH_TO_TLS_KEY_FILE"
51//! export KAFKA_METADATA_COUNT_MESSAGES="true"
52//! ```
53//!
54//! #### Load the Environment
55//!
56//! ```bash
57//! source ./env/kafka.env
58//! ```
59//!
60//! ### Start the Kafka Threadpool and Publish 100 Messages
61//!
62//! The included [./examples/start-threadpool.rs](https://github.com/jay-johnson/rust-kafka-threadpool/blob/main/examples/start-threadpool.rs) example will connect to the kafka cluster based off the environment configuration and publish 100 messages into the kafka ``testing`` topic.
63//!
64//! ```bash
65//! cargo build --example start-threadpool
66//! export RUST_BACKTRACE=1
67//! export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
68//! ./target/debug/examples/start-threadpool
69//! ```
70//!
71//! ### Consume Messages
72//!
73//! To consume the newly-published test messages from the ``testing`` topic, you can use your own consumer or the [rust-with-strimzi-kafka-and-tls/examples/run-consumer.rs](https://github.com/jay-johnson/rust-with-strimzi-kafka-and-tls/blob/main/examples/run-consumer.rs) example:
74//!
75//! ```bash
76//! # from the rust-with-strimzi-kafka-and-tls directory:
77//! cargo build --example run-consumer
78//! export RUST_BACKTRACE=1
79//! export RUST_LOG=info,rdkafka=info
80//! ./target/debug/examples/run-consumer -g rust-consumer-testing -t testing
81//! ```
82//!
83//! ### Get Kafka Cluster Metadata for All Topics, Partitions, ISR, and Offsets
84//!
85//! Run the [./examples/get-all-metadata.rs](https://github.com/jay-johnson/rust-kafka-threadpool/blob/main/examples/get-all-metadata.rs) example:
86//!
87//! ```bash
88//! cargo build --example get-all-metadata
89//! export RUST_BACKTRACE=1
90//! export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
91//! ./target/debug/examples/get-all-metadata
92//! ```
93//!
94//! ### Get Kafka Cluster Metadata for a Single Topic including Partitions, ISR and Offsets
95//!
96//! 1.  Set the Topic Name as an Environment Variable
97//!
98//!     ```bash
99//!     export KAFKA_TOPIC=testing
100//!     ```
101//!
102//! 1.  Run the [./examples/get-metadata-for-topic.rs](https://github.com/jay-johnson/rust-kafka-threadpool/blob/main/examples/get-metadata-for-topic.rs) example:
103//!
104//!     ```bash
105//!     cargo build --example get-metadata-for-topic
106//!     export RUST_BACKTRACE=1
107//!     export RUST_LOG=info,kafka_threadpool=info,rdkafka=info
108//!     ./target/debug/examples/get-metadata-for-topic
109//!     ```
110//!
111pub mod api;
112pub mod config;
113pub mod kafka_publisher;
114pub mod metadata;
115pub mod msg;
116pub mod pool;
117pub mod start_threadpool;
118pub mod thread_process_messages_handler;