name: Rust
on:
push:
branches: [ "main", "master" ]
pull_request:
branches: [ "main", "master" ]
env:
CARGO_TERM_COLOR: always
RUST_TEST_THREADS: 1
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Rust
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.85.0
components: rustfmt, clippy
- name: Check formatting
run: cargo fmt --all --check
- name: Clippy
run: cargo clippy --all-targets --all-features
test:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- KAFKA_VER: 3.8.1
COMPRESSION: NONE
SECURE: ""
- KAFKA_VER: 3.9.1
COMPRESSION: NONE
SECURE: ""
- KAFKA_VER: 4.0.1
COMPRESSION: NONE
SECURE: ""
- KAFKA_VER: 4.1.0
COMPRESSION: NONE
SECURE: ""
- KAFKA_VER: 4.1.0
COMPRESSION: GZIP
SECURE: ""
- KAFKA_VER: 4.1.0
COMPRESSION: SNAPPY
SECURE: ""
- KAFKA_VER: 4.1.0
COMPRESSION: NONE
SECURE: tls
- KAFKA_VER: 4.1.0
COMPRESSION: NONE
SECURE: sasl_plaintext
steps:
- uses: actions/checkout@v4
- name: Set up Rust
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.85.0
components: rustfmt, clippy
- name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: "17"
- name: Cache Kafka distribution
uses: actions/cache@v4
with:
path: ~/.cache/apache-kafka/${{ matrix.KAFKA_VER }}
key: apache-kafka-${{ runner.os }}-${{ matrix.KAFKA_VER }}
- name: Build
run: cargo build
- name: Download Kafka (if needed)
run: |
set -euo pipefail
KAFKA_VER="${{ matrix.KAFKA_VER }}"
KAFKA_HOME="$HOME/.cache/apache-kafka/$KAFKA_VER"
if [[ -d "$KAFKA_HOME" ]]; then
echo "Using cached Kafka at $KAFKA_HOME"
else
mkdir -p "$HOME/.cache/apache-kafka"
curl -fsSL "https://archive.apache.org/dist/kafka/${KAFKA_VER}/kafka_2.13-${KAFKA_VER}.tgz" | tar xz -C "$HOME/.cache/apache-kafka"
mv "$HOME/.cache/apache-kafka/kafka_2.13-${KAFKA_VER}" "$KAFKA_HOME"
fi
echo "KAFKA_HOME=$KAFKA_HOME" >> "$GITHUB_ENV"
- name: Start Kafka (KRaft, no Docker)
run: |
set -euo pipefail
DATA_DIR="$RUNNER_TEMP/kafka-data-${{ matrix.KAFKA_VER }}"
LOG_DIR="$DATA_DIR/logs"
CONF="$RUNNER_TEMP/kafka-kraft.properties"
TLS_DIR="$RUNNER_TEMP/kafka-tls"
LOG_FILE="$RUNNER_TEMP/kafka.log"
PID_FILE="$RUNNER_TEMP/kafka.pid"
rm -rf "$DATA_DIR"
mkdir -p "$LOG_DIR"
rm -rf "$TLS_DIR"
mkdir -p "$TLS_DIR"
# Generate PKCS12 artifacts for the broker from the checked-in PEM fixtures.
TLS_FIXTURES="$GITHUB_WORKSPACE/tests/fixtures/tls"
TLS_PASS="kafka-rust-test"
openssl pkcs12 -export \
-in "$TLS_FIXTURES/server.crt.pem" \
-inkey "$TLS_FIXTURES/server.key.pem" \
-certfile "$TLS_FIXTURES/ca.crt.pem" \
-name "kafka-rust-test-server" \
-out "$TLS_DIR/server.keystore.p12" \
-passout "pass:$TLS_PASS"
keytool -importcert \
-alias "kafka-rust-test-ca" \
-file "$TLS_FIXTURES/ca.crt.pem" \
-keystore "$TLS_DIR/server.truststore.p12" \
-storetype PKCS12 \
-storepass "$TLS_PASS" \
-noprompt
cat > "$CONF" <<EOF
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@127.0.0.1:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL:SASL_PLAINTEXT
listeners=PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9094,SASL://127.0.0.1:9096,CONTROLLER://127.0.0.1:9093
advertised.listeners=PLAINTEXT://127.0.0.1:9092,SSL://localhost:9094,SASL://localhost:9096
inter.broker.listener.name=PLAINTEXT
log.dirs=$LOG_DIR
sasl.enabled.mechanisms=PLAIN
listener.name.sasl.sasl.enabled.mechanisms=PLAIN
listener.name.sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkang" password="kafkang-secret" user_kafkang="kafkang-secret";
listener.name.sasl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
ssl.keystore.location=$TLS_DIR/server.keystore.p12
ssl.keystore.password=$TLS_PASS
ssl.key.password=$TLS_PASS
ssl.keystore.type=PKCS12
ssl.truststore.location=$TLS_DIR/server.truststore.p12
ssl.truststore.password=$TLS_PASS
ssl.truststore.type=PKCS12
ssl.client.auth=none
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
group.initial.rebalance.delay.ms=0
EOF
CLUSTER_ID="$("$KAFKA_HOME/bin/kafka-storage.sh" random-uuid)"
"$KAFKA_HOME/bin/kafka-storage.sh" format -t "$CLUSTER_ID" -c "$CONF" --ignore-formatted
"$KAFKA_HOME/bin/kafka-server-start.sh" "$CONF" >"$LOG_FILE" 2>&1 &
echo "$!" > "$PID_FILE"
for i in {1..60}; do
if "$KAFKA_HOME/bin/kafka-broker-api-versions.sh" --bootstrap-server 127.0.0.1:9092 >/dev/null 2>&1; then
break
fi
sleep 1
done
if ! "$KAFKA_HOME/bin/kafka-broker-api-versions.sh" --bootstrap-server 127.0.0.1:9092 >/dev/null 2>&1; then
echo >&2 "Kafka failed to start"
tail -n 200 "$LOG_FILE" || true
exit 1
fi
"$KAFKA_HOME/bin/kafka-topics.sh" --bootstrap-server 127.0.0.1:9092 --create --if-not-exists --topic kafka-rust-test --partitions 2 --replication-factor 1
"$KAFKA_HOME/bin/kafka-topics.sh" --bootstrap-server 127.0.0.1:9092 --create --if-not-exists --topic kafka-rust-test2 --partitions 2 --replication-factor 1
- name: Run tests
run: cargo test --all-features
env:
KAFKA_CLIENT_COMPRESSION: ${{ matrix.COMPRESSION }}
KAFKA_CLIENT_SECURE: ${{ matrix.SECURE }}
- name: Show Kafka logs (on failure)
if: failure()
run: tail -n 200 "$RUNNER_TEMP/kafka.log" || true
- name: Stop Kafka
if: always()
run: |
set +e
PID_FILE="$RUNNER_TEMP/kafka.pid"
if [[ -f "$PID_FILE" ]]; then
PID="$(cat "$PID_FILE")"
kill "$PID" 2>/dev/null || true
for _ in {1..30}; do
kill -0 "$PID" 2>/dev/null || exit 0
sleep 1
done
kill -9 "$PID" 2>/dev/null || true
fi
- name: Generate documentation
run: cargo doc