kafkang 0.3.0

Rust client for Apache Kafka
Documentation
name: Rust

on:
  push:
    branches: [ "main", "master" ]
  pull_request:
    branches: [ "main", "master" ]

env:
  CARGO_TERM_COLOR: always
  RUST_TEST_THREADS: 1

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Set up Rust
      uses: actions-rs/toolchain@v1
      with:
        profile: minimal
        toolchain: 1.85.0
        components: rustfmt, clippy

    - name: Check formatting
      run: cargo fmt --all --check

    - name: Clippy
      run: cargo clippy --all-targets --all-features

  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        include:
          - KAFKA_VER: 3.8.1
            COMPRESSION: NONE
            SECURE: ""
          - KAFKA_VER: 3.9.1
            COMPRESSION: NONE
            SECURE: ""
          - KAFKA_VER: 4.0.1
            COMPRESSION: NONE
            SECURE: ""
          - KAFKA_VER: 4.1.0
            COMPRESSION: NONE
            SECURE: ""
          - KAFKA_VER: 4.1.0
            COMPRESSION: GZIP
            SECURE: ""
          - KAFKA_VER: 4.1.0
            COMPRESSION: SNAPPY
            SECURE: ""
          - KAFKA_VER: 4.1.0
            COMPRESSION: NONE
            SECURE: tls
          - KAFKA_VER: 4.1.0
            COMPRESSION: NONE
            SECURE: sasl_plaintext
    steps:
    - uses: actions/checkout@v4

    - name: Set up Rust
      uses: actions-rs/toolchain@v1
      with:
        profile: minimal
        toolchain: 1.85.0
        components: rustfmt, clippy

    - name: Set up Java
      uses: actions/setup-java@v4
      with:
        distribution: temurin
        java-version: "17"

    - name: Cache Kafka distribution
      uses: actions/cache@v4
      with:
        path: ~/.cache/apache-kafka/${{ matrix.KAFKA_VER }}
        key: apache-kafka-${{ runner.os }}-${{ matrix.KAFKA_VER }}

    - name: Build
      run: cargo build

    - name: Download Kafka (if needed)
      run: |
        set -euo pipefail

        KAFKA_VER="${{ matrix.KAFKA_VER }}"
        KAFKA_HOME="$HOME/.cache/apache-kafka/$KAFKA_VER"
        if [[ -d "$KAFKA_HOME" ]]; then
          echo "Using cached Kafka at $KAFKA_HOME"
        else
          mkdir -p "$HOME/.cache/apache-kafka"
          curl -fsSL "https://archive.apache.org/dist/kafka/${KAFKA_VER}/kafka_2.13-${KAFKA_VER}.tgz" | tar xz -C "$HOME/.cache/apache-kafka"
          mv "$HOME/.cache/apache-kafka/kafka_2.13-${KAFKA_VER}" "$KAFKA_HOME"
        fi

        echo "KAFKA_HOME=$KAFKA_HOME" >> "$GITHUB_ENV"

    - name: Start Kafka (KRaft, no Docker)
      run: |
        set -euo pipefail

        DATA_DIR="$RUNNER_TEMP/kafka-data-${{ matrix.KAFKA_VER }}"
        LOG_DIR="$DATA_DIR/logs"
        CONF="$RUNNER_TEMP/kafka-kraft.properties"
        TLS_DIR="$RUNNER_TEMP/kafka-tls"
        LOG_FILE="$RUNNER_TEMP/kafka.log"
        PID_FILE="$RUNNER_TEMP/kafka.pid"

        rm -rf "$DATA_DIR"
        mkdir -p "$LOG_DIR"
        rm -rf "$TLS_DIR"
        mkdir -p "$TLS_DIR"

        # Generate PKCS12 artifacts for the broker from the checked-in PEM fixtures.
        TLS_FIXTURES="$GITHUB_WORKSPACE/tests/fixtures/tls"
        TLS_PASS="kafka-rust-test"

        openssl pkcs12 -export \
          -in "$TLS_FIXTURES/server.crt.pem" \
          -inkey "$TLS_FIXTURES/server.key.pem" \
          -certfile "$TLS_FIXTURES/ca.crt.pem" \
          -name "kafka-rust-test-server" \
          -out "$TLS_DIR/server.keystore.p12" \
          -passout "pass:$TLS_PASS"

        keytool -importcert \
          -alias "kafka-rust-test-ca" \
          -file "$TLS_FIXTURES/ca.crt.pem" \
          -keystore "$TLS_DIR/server.truststore.p12" \
          -storetype PKCS12 \
          -storepass "$TLS_PASS" \
          -noprompt

        cat > "$CONF" <<EOF
        process.roles=broker,controller
        node.id=1
        controller.quorum.voters=1@127.0.0.1:9093
        controller.listener.names=CONTROLLER
        listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL:SASL_PLAINTEXT
        listeners=PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9094,SASL://127.0.0.1:9096,CONTROLLER://127.0.0.1:9093
        advertised.listeners=PLAINTEXT://127.0.0.1:9092,SSL://localhost:9094,SASL://localhost:9096
        inter.broker.listener.name=PLAINTEXT

        log.dirs=$LOG_DIR

        sasl.enabled.mechanisms=PLAIN
        listener.name.sasl.sasl.enabled.mechanisms=PLAIN
        listener.name.sasl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafkang" password="kafkang-secret" user_kafkang="kafkang-secret";
        listener.name.sasl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler

        ssl.keystore.location=$TLS_DIR/server.keystore.p12
        ssl.keystore.password=$TLS_PASS
        ssl.key.password=$TLS_PASS
        ssl.keystore.type=PKCS12
        ssl.truststore.location=$TLS_DIR/server.truststore.p12
        ssl.truststore.password=$TLS_PASS
        ssl.truststore.type=PKCS12
        ssl.client.auth=none

        offsets.topic.replication.factor=1
        transaction.state.log.replication.factor=1
        transaction.state.log.min.isr=1
        group.initial.rebalance.delay.ms=0
        EOF

        CLUSTER_ID="$("$KAFKA_HOME/bin/kafka-storage.sh" random-uuid)"
        "$KAFKA_HOME/bin/kafka-storage.sh" format -t "$CLUSTER_ID" -c "$CONF" --ignore-formatted

        "$KAFKA_HOME/bin/kafka-server-start.sh" "$CONF" >"$LOG_FILE" 2>&1 &
        echo "$!" > "$PID_FILE"

        for i in {1..60}; do
          if "$KAFKA_HOME/bin/kafka-broker-api-versions.sh" --bootstrap-server 127.0.0.1:9092 >/dev/null 2>&1; then
            break
          fi
          sleep 1
        done

        if ! "$KAFKA_HOME/bin/kafka-broker-api-versions.sh" --bootstrap-server 127.0.0.1:9092 >/dev/null 2>&1; then
          echo >&2 "Kafka failed to start"
          tail -n 200 "$LOG_FILE" || true
          exit 1
        fi

        "$KAFKA_HOME/bin/kafka-topics.sh" --bootstrap-server 127.0.0.1:9092 --create --if-not-exists --topic kafka-rust-test --partitions 2 --replication-factor 1
        "$KAFKA_HOME/bin/kafka-topics.sh" --bootstrap-server 127.0.0.1:9092 --create --if-not-exists --topic kafka-rust-test2 --partitions 2 --replication-factor 1

    - name: Run tests
      run: cargo test --all-features
      env:
        KAFKA_CLIENT_COMPRESSION: ${{ matrix.COMPRESSION }}
        KAFKA_CLIENT_SECURE: ${{ matrix.SECURE }}

    - name: Show Kafka logs (on failure)
      if: failure()
      run: tail -n 200 "$RUNNER_TEMP/kafka.log" || true

    - name: Stop Kafka
      if: always()
      run: |
        set +e
        PID_FILE="$RUNNER_TEMP/kafka.pid"
        if [[ -f "$PID_FILE" ]]; then
          PID="$(cat "$PID_FILE")"
          kill "$PID" 2>/dev/null || true
          for _ in {1..30}; do
            kill -0 "$PID" 2>/dev/null || exit 0
            sleep 1
          done
          kill -9 "$PID" 2>/dev/null || true
        fi

    - name: Generate documentation
      run: cargo doc