crabka-client-streams

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust.
This crate is part of Crabka, a Rust implementation of Kafka-compatible infrastructure and clients.
Install
crabka-client-streams = "0.3.3"
For workspace development, use the path dependency from this repository instead.
Usage example
Build and run a simple source-to-sink topology using the KIP-1071 membership client:
use std::sync::Arc;
use crabka_client_streams::{StreamsEvent, StreamsMembership, Topology};
async fn run() -> Result<(), Box<dyn std::error::Error>> {
let mut topo = Topology::new();
let src = topo.add_source::<String, String>("src", ["input-topic"]);
topo.add_sink("snk", "output-topic", [&src]);
let built = topo.build("orders-stream")?;
let mut membership = StreamsMembership::builder()
.bootstrap("127.0.0.1:9092")
.group_id("orders-stream")
.topology(Arc::new(built))
.build()
.await?;
if let StreamsEvent::Assigned(assignment) = membership.next_event().await? {
println!("active tasks: {:?}", assignment.active);
}
Ok(())
}
Schema-aware payloads (Avro / Protobuf / JSON)
Read and write Confluent-framed payloads whose schemas are
registered/validated against a Confluent-compatible Schema Registry (e.g.
crabka-schema-registry) — built in, no feature flag. The typed serdes from
crabka-schema-serde plug straight into the Streams
Serde<T> boundary.
The serdes are topic-aware (like JVM Kafka's serialize(topic, data)): a serde
carries its key/value role and derives its subject (<topic>-value / <topic>-key)
from the topic. Declare a type's default serde once and use the plain
add_source/add_sink:
use std::sync::Arc;
use apache_avro::AvroSchema;
use serde::{Deserialize, Serialize};
use crabka_client_streams::{DefaultSerde, SchemaPrewarm, SchemaSerde, StreamsMembership, Topology};
use crabka_schema_serde::{RegistryClient, set_default_registry, cache::{CacheConfig, SchemaCache}, format::avro::AvroSerde};
#[derive(Clone, Serialize, Deserialize, AvroSchema)]
struct Order { id: String, total: f64 }
impl DefaultSerde for Order {
type Serde = SchemaSerde<Order, AvroSerde<Order>>;
}
async fn run() -> Result<(), Box<dyn std::error::Error>> {
let cache = SchemaCache::new(RegistryClient::new("http://127.0.0.1:8081"), CacheConfig::default());
set_default_registry(cache.clone());
let mut topo = Topology::new();
let src = topo.add_source::<String, Order>("src", ["orders"]);
topo.add_sink("snk", "orders-copy", [&src]);
let built = topo.build("orders-avro")?;
let mut membership = StreamsMembership::builder()
.bootstrap("127.0.0.1:9092")
.group_id("orders-avro")
.topology(Arc::new(built))
.maybe_schema_prewarm(Some(cache as Arc<dyn SchemaPrewarm>))
.build()
.await?;
Ok(())
}
For keys, per-topic subjects, or validation, construct the serde explicitly
(AvroSerde::<T>::value(&cache) / ::key(&cache)) and use
add_source_explicit/add_sink_explicit with (key_serde, value_serde).
Runnable per-format pipelines live under examples/ — run them
against a live broker + registry:
cargo run -p crabka-client-streams --example avro_pipeline
cargo run -p crabka-client-streams --example protobuf_pipeline
cargo run -p crabka-client-streams --example json_pipeline
Documentation
API documentation is published on docs.rs/crabka-client-streams. The repository README contains project-wide setup, development, and release notes.
License
Apache-2.0. See the repository LICENSE and NOTICE files for details.