crabka-client-streams 0.3.4

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust
Documentation

crabka-client-streams

Crates.io Docs.rs CI

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust.

This crate is part of Crabka, a Rust implementation of Kafka-compatible infrastructure and clients.

Install

crabka-client-streams = "0.3.3"

For workspace development, use the path dependency from this repository instead.

Usage example

Build and run a simple source-to-sink topology using the KIP-1071 membership client:

use std::sync::Arc;
use crabka_client_streams::{StreamsEvent, StreamsMembership, Topology};

# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let mut topo = Topology::new();
let src = topo.add_source::<String, String>("src", ["input-topic"]);
topo.add_sink("snk", "output-topic", [&src]);
let built = topo.build("orders-stream")?;

let mut membership = StreamsMembership::builder()
    .bootstrap("127.0.0.1:9092")
    .group_id("orders-stream")
    .topology(Arc::new(built))
    .build()
    .await?;

if let StreamsEvent::Assigned(assignment) = membership.next_event().await? {
    println!("active tasks: {:?}", assignment.active);
}
# Ok(())
# }

Schema-aware payloads (Avro / Protobuf / JSON)

Read and write Confluent-framed payloads whose schemas are registered/validated against a Confluent-compatible Schema Registry (e.g. crabka-schema-registry) — built in, no feature flag. The typed serdes from crabka-schema-serde plug straight into the Streams Serde<T> boundary.

The serdes are topic-aware (like JVM Kafka's serialize(topic, data)): a serde carries its key/value role and derives its subject (<topic>-value / <topic>-key) from the topic. Declare a type's default serde once and use the plain add_source/add_sink:

use std::sync::Arc;
use apache_avro::AvroSchema;
use serde::{Deserialize, Serialize};
use crabka_client_streams::{DefaultSerde, SchemaPrewarm, SchemaSerde, StreamsMembership, Topology};
use crabka_schema_serde::{RegistryClient, set_default_registry, cache::{CacheConfig, SchemaCache}, format::avro::AvroSerde};

#[derive(Clone, Serialize, Deserialize, AvroSchema)]
struct Order { id: String, total: f64 }

// Order's default serde: Avro values from the process default registry.
impl DefaultSerde for Order {
    type Serde = SchemaSerde<Order, AvroSerde<Order>>;
}

# async fn run() -> Result<(), Box<dyn std::error::Error>> {
let cache = SchemaCache::new(RegistryClient::new("http://127.0.0.1:8081"), CacheConfig::default());
set_default_registry(cache.clone());

let mut topo = Topology::new();
let src = topo.add_source::<String, Order>("src", ["orders"]);
topo.add_sink("snk", "orders-copy", [&src]);
let built = topo.build("orders-avro")?;

// `schema_prewarm` resolves/registers schema ids once at membership start.
let mut membership = StreamsMembership::builder()
    .bootstrap("127.0.0.1:9092")
    .group_id("orders-avro")
    .topology(Arc::new(built))
    .maybe_schema_prewarm(Some(cache as Arc<dyn SchemaPrewarm>))
    .build()
    .await?;
# let _ = &mut membership;
# Ok(()) }

For keys, per-topic subjects, or validation, construct the serde explicitly (AvroSerde::<T>::value(&cache) / ::key(&cache)) and use add_source_explicit/add_sink_explicit with (key_serde, value_serde).

Runnable per-format pipelines live under examples/ — run them against a live broker + registry:

cargo run -p crabka-client-streams --example avro_pipeline
cargo run -p crabka-client-streams --example protobuf_pipeline
cargo run -p crabka-client-streams --example json_pipeline

Documentation

API documentation is published on docs.rs/crabka-client-streams. The repository README contains project-wide setup, development, and release notes.

License

Apache-2.0. See the repository LICENSE and NOTICE files for details.