crabka_client_streams/lib.rs
1//! Kafka Streams-compatible client runtime for Crabka.
2//!
3//! `crabka-client-streams` provides three layers that can be used independently:
4//!
5//! - [`StreamsBuilder`] builds JVM-compatible KStream/KTable topologies for
6//! common application code: map/filter chains, aggregations, joins, windows,
7//! suppression, global tables, and custom Processor-API nodes.
8//! - [`Topology`] is the typed Processor API for applications that want explicit
9//! source, processor, sink, and state-store wiring.
10//! - [`KafkaStreams`] runs a built topology against a Kafka-compatible broker by
11//! joining a KIP-1071 streams group, processing assigned input partitions,
12//! producing sink records, restoring changelog-backed stores, and serving local
13//! interactive queries.
14//!
15//! For broker-free tests, [`TopologyTestDriver`] executes the same built topology
16//! in process. The driver is the fastest way to exercise business logic and state
17//! stores before running with [`KafkaStreams`].
18//!
19//! ## Quick start
20//!
21//! ```no_run
22//! use crabka_client_streams::{NodeHandle, StreamsEvent, StreamsMembership, Topology};
23//! use std::time::Duration;
24//!
25//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
26//! let mut topo = Topology::new();
27//! let src: NodeHandle<String, String> = topo.add_source("src", ["input-topic"]);
28//! topo.add_sink("snk", "output-topic", [&src]);
29//! let built = topo.build("my-application-id")?;
30//!
31//! let mut membership = StreamsMembership::builder()
32//! .bootstrap("localhost:9092")
33//! .group_id("my-application-id")
34//! .topology(std::sync::Arc::new(built))
35//! .build()
36//! .await?;
37//!
38//! loop {
39//! match membership.next_event().await? {
40//! StreamsEvent::Assigned(a) => {
41//! for task in &a.active {
42//! println!(
43//! "active task {} → {:?}",
44//! task.subtopology_id, task.source_topic_partitions
45//! );
46//! }
47//! }
48//! StreamsEvent::NotReady(statuses) => println!("not ready: {statuses:?}"),
49//! StreamsEvent::Fenced => println!("rejoined after fence"),
50//! }
51//! }
52//! # }
53//! ```
54//! ## Processor API
55//!
56//! Define a typed topology, then test it with the broker-free [`TopologyTestDriver`]:
57//!
58//! ```
59//! use crabka_client_streams::{
60//! NodeHandle, Record, StringSerde, Topology, TopologyTestDriver, impl_processor,
61//! };
62//!
63//! struct Upper;
64//! impl_processor! {
65//! impl Upper: (String, String) -> (String, String) {
66//! async fn process(&mut self, ctx, r) {
67//! ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
68//! }
69//! }
70//! }
71//!
72//! let mut topo = Topology::new();
73//! let src: NodeHandle<String, String> = topo.add_source("src", ["in"]);
74//! let up = topo.add_processor("up", || Upper, [&src]);
75//! topo.add_sink("out", "out", [&up]);
76//! let built = topo.build("my-app").unwrap();
77//!
78//! let mut driver = TopologyTestDriver::new(&built).unwrap();
79//! driver.pipe_input(
80//! "in",
81//! (StringSerde, StringSerde),
82//! Some("k".to_string()),
83//! "hello".to_string(),
84//! 0,
85//! );
86//! assert_eq!(
87//! driver.read_output("out", (StringSerde, StringSerde)),
88//! Some((Some("k".to_string()), "HELLO".to_string())),
89//! );
90//! ```
91//!
92//! Nodes are wired by handle, not by string name, so a mis-typed edge is a
93//! **compile error** rather than a `build()`-time failure:
94//!
95//! ```compile_fail
96//! use crabka_client_streams::{NodeHandle, Topology};
97//!
98//! let mut topo = Topology::new();
99//! // `src` produces Record<String, String>:
100//! let src: NodeHandle<String, String> = topo.add_source("src", ["in"]);
101//! // but this sink expects Record<String, i64> — won't compile:
102//! topo.add_sink::<String, i64>("out", "out", [&src]);
103//! ```
104//!
105//! ## State stores
106//!
107//! Processors can persist and restore keyed state via a named [`KeyValueStore`].
108//! The store is attached to the topology with `add_state_store`, and accessed
109//! inside `process` via [`ProcessorContext::get_state_store`].
110//!
111//! ```
112//! use crabka_client_streams::{
113//! I64Serde, NodeHandle, Record, StringSerde, Topology, TopologyTestDriver, impl_processor,
114//! };
115//!
116//! struct Counter;
117//! impl_processor! {
118//! impl Counter: (String, String) -> (String, i64) {
119//! async fn process(&mut self, ctx, r) {
120//! let n = {
121//! let s = ctx.get_state_store::<String, i64>("counts").unwrap();
122//! let n = s.get(&r.value).await.unwrap_or(0) + 1;
123//! s.put(r.value.clone(), n).await;
124//! n
125//! };
126//! ctx.forward(Record::new(Some(r.value), n, r.timestamp));
127//! }
128//! }
129//! }
130//!
131//! let mut topo = Topology::new();
132//! let src: NodeHandle<String, String> = topo.add_source("src", ["in"]);
133//! let c = topo.add_processor("c", || Counter, [&src]);
134//! topo.add_state_store("counts", StringSerde, I64Serde, [c.name()]);
135//! topo.add_sink("out", "out", [&c]);
136//! let built = topo.build("app").unwrap();
137//!
138//! let mut driver = TopologyTestDriver::new(&built).unwrap();
139//! driver.pipe_input("in", (StringSerde, StringSerde), None, "a".to_string(), 0);
140//! driver.pipe_input("in", (StringSerde, StringSerde), None, "a".to_string(), 1);
141//! assert_eq!(
142//! driver.read_output("out", (StringSerde, I64Serde)),
143//! Some((Some("a".to_string()), 1_i64)),
144//! );
145//! assert_eq!(
146//! driver.read_output("out", (StringSerde, I64Serde)),
147//! Some((Some("a".to_string()), 2_i64)),
148//! );
149//! assert_eq!(
150//! driver.store_get::<String, i64>("counts", &"a".to_string()),
151//! Some(2_i64)
152//! );
153//! ```
154//!
155//! ## DSL (KStream/KTable)
156//!
157//! [`StreamsBuilder`] is the high-level DSL entry point. It wires a topology
158//! from source streams through stateless transforms, aggregations, and sinks
159//! without writing explicit [`Processor`] implementations. The resulting
160//! [`BuiltTopology`] is interchangeable with the Processor-API variant — run it
161//! with [`TopologyTestDriver`] for broker-free testing or [`KafkaStreams`] for
162//! production.
163//!
164//! A [`KTable`] is internally a *change stream*: each record carries a
165//! `Change { old_value, new_value }` and `filter` emits tombstones (a row whose
166//! key stops matching is deleted downstream with `new_value = None`).
167//! [`KStream::to_table`] materializes a stream into a [`KTable`] backed by a
168//! named [`Materialized`] store. [`KTable::to_stream`] forwards update records
169//! and drops tombstones from the output stream.
170//!
171//! [`KStream::join_table`] and [`KStream::left_join_table`] join a stream against
172//! a **materialized** `KTable`: the stream side drives, and for each record the
173//! table store is looked up by key. An inner join emits only when the table has a
174//! matching entry; a left join always emits (with `None` as the table value when
175//! absent). The stream must be **copartitioned** with the table (same key serde
176//! and partition count); a key-changing stream must be `.repartition(..)`-ed
177//! before joining — the join itself inserts no implicit repartition. (The plain
178//! [`KStream::join`]/[`KStream::left_join`] names are the windowed *stream-stream*
179//! join below — Rust can't overload by argument type as the JVM does.)
180//!
181//! [`KTable::join`], [`KTable::left_join`], and [`KTable::outer_join`] join two
182//! **materialized** `KTables`. Unlike the stream-table join, a change on *either*
183//! side recomputes the join: the changed side re-reads the other side's current
184//! value from its store and forwards a `Change` (a tombstone when the joined row
185//! stops existing). Inner emits only when both sides hold a value; left emits
186//! whenever the left side is present; outer emits whenever either side is. The two
187//! source topics are declared as a **copartition group**, and the result is an
188//! unmaterialized `KTable` (no result store/changelog — materialize a downstream op
189//! to persist it).
190//!
191//! ## Foreign-key joins
192//!
193//! [`KTable::join_on_foreign_key`] and [`KTable::left_join_on_foreign_key`]
194//! (KIP-213) join two `KTable`s on a **foreign key** rather than the primary key:
195//! for each left row, an `fk_extractor(&leftValue)` selects the foreign key, which
196//! looks up a row in the right table. The relationship is **many-to-one** — many
197//! left rows can reference the same right row, and a change on *either* side
198//! re-evaluates every affected pair: a left-value change re-selects the foreign
199//! key, and a right-row change re-emits for every left row currently subscribed to
200//! that foreign key. **Inner** emits `joiner(&left, &right)` only when the foreign
201//! row exists (a foreign key with no match retracts with a tombstone); **left**
202//! emits for every left row, passing `None` for the foreign value on a miss.
203//!
204//! Both input tables must be **materialized source tables** — built with
205//! [`StreamsBuilder::table`] (the join reads each side's store and serdes). The
206//! result is an **unmaterialized** `KTable` (no result store/changelog; materialize
207//! a downstream op to persist it). Because the foreign key differs from the primary
208//! key, the join cannot be copartitioned directly; it lowers to the KIP-213
209//! two-subtopology graph — a *subscription registration* repartition topic (keyed
210//! by foreign key), a *subscription response* repartition topic (keyed back by
211//! primary key), and a subscription state store that tracks which primary keys
212//! subscribe to each foreign key — all created and copartitioned automatically.
213//!
214//! ```no_run
215//! use crabka_client_streams::{StreamsBuilder, StringSerde};
216//!
217//! let builder = StreamsBuilder::new();
218//! // `a`: primaryKey -> foreignKey ("A"); `b`: foreignKey -> value ("X").
219//! let a = builder.table::<String, String>("a", "sa");
220//! let b = builder.table::<String, String>("b", "sb");
221//! a.join_on_foreign_key(
222//! &b,
223//! |left: &String| left.clone(), // foreign-key extractor
224//! |left: &String, right: &String| format!("{left}{right}"), // joiner -> "AX"
225//! StringSerde, // foreign-key serde
226//! )
227//! .to_stream()
228//! .to("out");
229//! drop(a);
230//! drop(b);
231//! let topology = builder.build("fk-app").unwrap();
232//! # let _ = topology;
233//! ```
234//!
235//! [`KGroupedStream::windowed_by`] turns a grouped stream into time-windowed
236//! aggregations: `windowed_by(TimeWindows::of_size(..))` then `count`/`reduce`/
237//! `aggregate` yields a [`KTable`]`<`[`Windowed`]`<K>, V>`. [`TimeWindows`] are
238//! tumbling (`of_size`) or hopping (`.advance_by(..)`); each record is aggregated
239//! into every window it falls into, and a result is emitted on **every update**.
240//! Add [`KTable::suppress`] with [`Suppressed::until_window_closes`] when the
241//! application wants one final result after the window closes. The windowed store is a
242//! [`Window`]-keyed store over the same pluggable backend, with a `compact,delete`
243//! changelog (`retention.ms = size + grace + 1 day`). Read the windowed output
244//! with [`TimeWindowedSerde`] (the key carries the window start).
245//!
246//! [`KGroupedStream::windowed_by_session`] groups records into data-driven
247//! **session windows**: records for a key form one session `[start, end]` while
248//! they stay within an inactivity [`SessionWindows`] gap. Terminal `count` /
249//! `reduce` / `aggregate` (the last taking a session merger) yield a
250//! [`KTable`]`<`[`Windowed`]`<K>, V>`. Each record merges every session within the
251//! gap into one `[minStart, maxEnd]` session — emitting a tombstone for each
252//! merged-away session and the new merged session (KIP session semantics,
253//! emit-on-update). The session store keys by `key‖end‖start` (a third typed store
254//! over the pluggable backend); read the output with [`SessionWindowedSerde`].
255//!
256//! [`KGroupedStream::windowed_by_sliding`] produces a
257//! [`SlidingWindowedKGroupedStream`] with `count`/`reduce`/`aggregate`. Sliding
258//! windows are **data-defined** inclusive windows of fixed size
259//! `time_difference_ms`: a record at time `t` falls into every window
260//! `[ws, ws + time_difference]` with `ws ∈ [t - time_difference, t]`. Unlike
261//! tumbling/hopping windows there is no epoch alignment; the aggregator
262//! discovers affected windows by scanning the window store and emits on update.
263//! Out-of-order records within `time_difference + grace` are folded into the
264//! windows they belong to. The output is a `KTable<Windowed<K>, _>` reusing the
265//! [`TimeWindowedSerde`] output-key layout (`key‖windowStart:8B-BE`).
266//!
267//! [`KTable::suppress`]`(`[`Suppressed`]`::until_window_closes(`[`BufferConfig`]`::unbounded()))`
268//! turns a windowed table's emit-on-update change-stream into **final results**: it
269//! buffers each window's updates and forwards the window's final value exactly once,
270//! when stream-time passes `window.end + grace` (the grace comes from the upstream
271//! windowed/session aggregation). [`Suppressed::until_time_limit`] is the
272//! rate-limiter variant for *any* table — it emits at most one update per key per
273//! wait (stream-time), a newer record resetting the timer.
274//!
275//! The buffer is a **registered, durable state store** (a time-ordered
276//! `SuppressBytesStore` keyed by the serialized record key). With logging on (the
277//! default) it writes a **JVM-byte-exact** changelog — `BufferValue` +
278//! `ProcessorRecordContext` value, a plain `cleanup.policy=compact` topic
279//! `app-KTABLE-SUPPRESS-STATE-STORE-<n>-changelog` — and restores the buffered
280//! records on restart via the same machinery as every other store, so windows that
281//! were still buffered re-emit on close after a restart. [`Suppressed::with_logging_disabled`]
282//! keeps the buffer in memory only (no changelog topic). The serdes reach the store
283//! from the producing op (the windowed/session aggregation or [`StreamsBuilder::table`]).
284//!
285//! The buffer is bounded by [`BufferConfig`]: [`BufferConfig::unbounded`]`().with_max_records(n)`
286//! / [`BufferConfig::with_max_bytes`]`(n)` cap it (bytes = serialized key + value
287//! summed); exceeding a cap either shuts the task down (`shutDownWhenFull`, the
288//! `until_window_closes` default) or — with `BufferConfig::max_records(n)` /
289//! [`BufferConfig::max_bytes`] (eager) / [`BufferConfig::emit_early_when_full`] —
290//! evicts + emits the oldest buffered record (`emitEarlyWhenFull`).
291//!
292//! [`KStream::join`], [`KStream::left_join`], and [`KStream::outer_join`] are the
293//! windowed **stream-stream** joins: two streams join over a [`JoinWindows`] time
294//! window, configured with [`StreamJoined`] serdes. Each side buffers its records
295//! in its own `retainDuplicates` window store (so two records at the same time
296//! both survive); a record from one side joins every record on the other side
297//! within `[t - before, t + after]`, emitting `joiner(a, b)` at `max(ts)`. The two
298//! window-store changelogs use `cleanup.policy=delete` (`retention.ms = before +
299//! after + grace + 1 day`), and the two source topics form a copartition group. An
300//! inner join emits only on a match; **left**/**outer** additionally emit the
301//! null-padded result for a record that finds no match, once its window has closed
302//! (KIP-633 stream-time-driven emission — there is no wall-clock throttle). Left/
303//! outer buffer the as-yet-unmatched records in a shared `KSTREAM-OUTERSHARED-`
304//! KV store (a compact changelog) and rename their per-side processors to
305//! `KSTREAM-OUTERTHIS-`/`KSTREAM-OUTEROTHER-` to match the JVM. As with the other
306//! joins, a key-changing stream must `.repartition(..)` before joining.
307//!
308//! [`StreamsBuilder::global_table`] sources a [`GlobalKTable`]: a **fully-replicated**
309//! lookup table. Every application instance reads *all* partitions of the source
310//! topic into one shared global store, so the source topic itself is the truth —
311//! there is **no copartitioning, no repartition, and no changelog** (the global
312//! store is rebuilt from the source on startup). The store is *invisible in the
313//! wire topology* (no subtopology of its own), though its global source node still
314//! consumes a node-group index during grouping (so declaring `global_table` before
315//! `stream` shifts the stream subtopology id). [`KStream::join_global`] /
316//! [`KStream::left_join_global`] join a stream to it by a **per-record-derived
317//! key** — `key_mapper(&streamKey, &streamValue)` selects the global key (which may
318//! differ from the stream key) — and emit `joiner(&streamValue, &globalValue)` keyed
319//! by the *stream* key. An inner `join_global` skips a record on a store miss; a
320//! `left_join_global` always emits, passing `None` for the global side. Because the
321//! store is fully replicated, any record can look up any key on every instance.
322//! The runtime's global consumer **bootstraps** the store — draining every partition
323//! of the source topic to end-of-log — *before* any task begins processing, so the
324//! first joined record already sees the complete global table.
325//!
326//! [`KStream::process`] and [`KStream::process_values`] (KIP-820) drop a custom
327//! Processor-API node into a DSL pipeline: a user-written [`Processor`] (for
328//! `process`) or [`FixedKeyProcessor`] (for `process_values`) that reads and writes
329//! state stores connected by name. Register the store first with
330//! [`StreamsBuilder::add_state_store`] (a compact-changelog [`KeyValueStore`]), then
331//! pass its name to the `process`/`process_values` call that uses it — the named
332//! store is attached to that node and its `app-<store>-changelog` topic appears in
333//! the wire. `process` may rewrite the record key, so its result is
334//! **key-changing**: a downstream `group_by_key`/join inserts a repartition.
335//! `process_values` is **fixed-key** — it can change the value but not the key — so
336//! it carries the upstream key lineage and forces **no** repartition. That guarantee
337//! is structural: a [`FixedKeyProcessor`] only ever receives and forwards a
338//! [`FixedKeyRecord`], whose key is fixed from the input and preserved through
339//! [`FixedKeyRecord::with_value`]; the context's only `forward` re-attaches that key,
340//! so the processor cannot emit a different one. (An `add_state_store` store that no
341//! `process`/`process_values` connects is simply never instantiated — no changelog,
342//! no runtime store.)
343//!
344//! ### Enriching a stream with a fully replicated table
345//!
346//! `GlobalKTable` is useful for reference data such as customer profiles,
347//! product catalogs, or fraud watchlists where every app instance should be able
348//! to look up any key without repartitioning the stream:
349//!
350//! ```
351//! use crabka_client_streams::{StreamsBuilder, StringSerde};
352//!
353//! let b = StreamsBuilder::new();
354//! let customers = b.global_table::<String, String>("customers", "customers-by-id");
355//!
356//! b.stream::<String, String>(["orders"])
357//! .left_join_global(
358//! &customers,
359//! |_order_id, order| order.split(':').next().unwrap_or("").to_string(),
360//! |order, customer| format!("{order}|customer={}", customer.map_or("unknown", |v| v)),
361//! )
362//! .to("enriched-orders");
363//!
364//! drop(customers);
365//! let built = b.build("orders-enricher").unwrap();
366//! assert_eq!(built.list_source_topics(), vec!["orders".to_string()]);
367//! ```
368//!
369//! The same enrichment with **Avro** payloads and rich compound types — declare
370//! each type's default serde once and the DSL reads/writes Confluent-framed
371//! records resolved against the schema registry (no per-call serde wiring):
372//!
373//! ```
374//! use apache_avro::AvroSchema;
375//! use crabka_client_streams::{DefaultSerde, SchemaSerde, StreamsBuilder};
376//! use crabka_schema_serde::cache::{CacheConfig, SchemaCache};
377//! use crabka_schema_serde::format::avro::AvroSerde;
378//! use crabka_schema_serde::{RegistryClient, set_default_registry};
379//! use serde::{Deserialize, Serialize};
380//!
381//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
382//! struct Order {
383//! order_id: String,
384//! customer_id: String,
385//! amount_cents: i64,
386//! }
387//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
388//! struct Customer {
389//! customer_id: String,
390//! name: String,
391//! tier: Tier,
392//! region: String,
393//! }
394//! #[derive(Clone, Copy, Serialize, Deserialize, AvroSchema)]
395//! enum Tier {
396//! Standard,
397//! Gold,
398//! Platinum,
399//! }
400//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
401//! struct EnrichedOrder {
402//! order_id: String,
403//! customer: String,
404//! tier: Tier,
405//! amount_cents: i64,
406//! }
407//!
408//! impl DefaultSerde for Order {
409//! type Serde = SchemaSerde<Order, AvroSerde<Order>>;
410//! }
411//! impl DefaultSerde for Customer {
412//! type Serde = SchemaSerde<Customer, AvroSerde<Customer>>;
413//! }
414//! impl DefaultSerde for EnrichedOrder {
415//! type Serde = SchemaSerde<EnrichedOrder, AvroSerde<EnrichedOrder>>;
416//! }
417//!
418//! // Point the default serdes at a registry (not contacted until the app runs).
419//! set_default_registry(SchemaCache::new(
420//! RegistryClient::new("http://localhost:8081"),
421//! CacheConfig::default(),
422//! ));
423//!
424//! let b = StreamsBuilder::new();
425//! let customers = b.global_table::<String, Customer>("customers", "customers-by-id");
426//! b.stream::<String, Order>(["orders"])
427//! .left_join_global(
428//! &customers,
429//! |_order_key, order| order.customer_id.clone(),
430//! |order, customer| EnrichedOrder {
431//! order_id: order.order_id.clone(),
432//! customer: customer.map_or_else(|| "unknown".into(), |c| c.name.clone()),
433//! tier: customer.map_or(Tier::Standard, |c| c.tier),
434//! amount_cents: order.amount_cents,
435//! },
436//! )
437//! .to("enriched-orders");
438//! drop(customers);
439//! let built = b.build("orders-enricher-avro").unwrap();
440//! assert_eq!(built.list_source_topics(), vec!["orders".to_string()]);
441//! ```
442//!
443//! ### Final windowed counts
444//!
445//! Windowed aggregations emit on every update by default. Add `suppress` when
446//! downstream systems should receive only the final value after the window grace
447//! has elapsed:
448//!
449//! ```
450//! use crabka_client_streams::{BufferConfig, StreamsBuilder, Suppressed, TimeWindows};
451//!
452//! let b = StreamsBuilder::new();
453//! b.stream::<String, String>(["clicks"])
454//! .group_by_key()
455//! .windowed_by(TimeWindows::of_size(60_000).grace(10_000))
456//! .count("click-counts")
457//! .suppress(Suppressed::until_window_closes(BufferConfig::unbounded()))
458//! .to_stream()
459//! .to("click-counts-final");
460//!
461//! let built = b.build("click-analytics").unwrap();
462//! assert_eq!(
463//! built.list_sink_topics(),
464//! vec!["click-counts-final".to_string()]
465//! );
466//! ```
467//!
468//! The same windowed aggregation over **Avro** orders, accumulating a compound
469//! per-window revenue record (the aggregation state is itself an Avro record in
470//! the windowed store):
471//!
472//! ```
473//! use apache_avro::AvroSchema;
474//! use crabka_client_streams::{
475//! BufferConfig, DefaultSerde, SchemaSerde, StreamsBuilder, Suppressed, TimeWindows,
476//! };
477//! use crabka_schema_serde::cache::{CacheConfig, SchemaCache};
478//! use crabka_schema_serde::format::avro::AvroSerde;
479//! use crabka_schema_serde::{RegistryClient, set_default_registry};
480//! use serde::{Deserialize, Serialize};
481//!
482//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
483//! struct Order {
484//! order_id: String,
485//! region: String,
486//! amount_cents: i64,
487//! }
488//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
489//! struct Revenue {
490//! order_count: i64,
491//! gross_cents: i64,
492//! }
493//!
494//! impl DefaultSerde for Order {
495//! type Serde = SchemaSerde<Order, AvroSerde<Order>>;
496//! }
497//! impl DefaultSerde for Revenue {
498//! type Serde = SchemaSerde<Revenue, AvroSerde<Revenue>>;
499//! }
500//!
501//! set_default_registry(SchemaCache::new(
502//! RegistryClient::new("http://localhost:8081"),
503//! CacheConfig::default(),
504//! ));
505//!
506//! let b = StreamsBuilder::new();
507//! b.stream::<String, Order>(["orders"]) // keyed by region
508//! .group_by_key()
509//! .windowed_by(TimeWindows::of_size(60_000).grace(10_000))
510//! .aggregate(
511//! || Revenue {
512//! order_count: 0,
513//! gross_cents: 0,
514//! },
515//! |_region, order, acc| Revenue {
516//! order_count: acc.order_count + 1,
517//! gross_cents: acc.gross_cents + order.amount_cents,
518//! },
519//! "revenue-by-window",
520//! )
521//! .suppress(Suppressed::until_window_closes(BufferConfig::unbounded()))
522//! .to_stream()
523//! .to("revenue-per-window");
524//! let built = b.build("revenue-analytics").unwrap();
525//! assert_eq!(
526//! built.list_sink_topics(),
527//! vec!["revenue-per-window".to_string()]
528//! );
529//! ```
530//!
531//! ```
532//! use crabka_client_streams::{I64Serde, StreamsBuilder, StringSerde, TopologyTestDriver};
533//!
534//! // Build a word-count topology: group by key, count, forward to "out".
535//! let b = StreamsBuilder::new();
536//! b.stream::<String, String>(["in"])
537//! .group_by_key()
538//! .count("counts")
539//! .to_stream()
540//! .to("out");
541//! let built = b.build("word-count").unwrap();
542//!
543//! // Drive it broker-free with TopologyTestDriver.
544//! let mut driver = TopologyTestDriver::new(&built).unwrap();
545//! for word in ["a", "a", "b"] {
546//! driver.pipe_input(
547//! "in",
548//! (StringSerde, StringSerde),
549//! Some(word.to_string()),
550//! word.to_string(),
551//! 0,
552//! );
553//! }
554//!
555//! // The stream output carries the running count per key.
556//! assert_eq!(
557//! driver.read_output("out", (StringSerde, I64Serde)),
558//! Some((Some("a".to_string()), 1)),
559//! );
560//! assert_eq!(
561//! driver.read_output("out", (StringSerde, I64Serde)),
562//! Some((Some("a".to_string()), 2)),
563//! );
564//! assert_eq!(
565//! driver.read_output("out", (StringSerde, I64Serde)),
566//! Some((Some("b".to_string()), 1)),
567//! );
568//!
569//! // The materialized store holds the final count per key.
570//! assert_eq!(
571//! driver.store_get::<String, i64>("counts", &"a".to_string()),
572//! Some(2)
573//! );
574//! assert_eq!(
575//! driver.store_get::<String, i64>("counts", &"b".to_string()),
576//! Some(1)
577//! );
578//! ```
579//!
580//! ### Applied: projecting Avro records
581//!
582//! A realistic stateless projection over compound **Avro** types — keep paid
583//! orders and bill each into a summary (a nested `Vec` of line items, an
584//! `Option`, and an enum, all carried as one Avro record per topic):
585//!
586//! ```
587//! use apache_avro::AvroSchema;
588//! use crabka_client_streams::{DefaultSerde, SchemaSerde, StreamsBuilder};
589//! use crabka_schema_serde::cache::{CacheConfig, SchemaCache};
590//! use crabka_schema_serde::format::avro::AvroSerde;
591//! use crabka_schema_serde::{RegistryClient, set_default_registry};
592//! use serde::{Deserialize, Serialize};
593//!
594//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
595//! struct Order {
596//! order_id: String,
597//! status: OrderStatus,
598//! lines: Vec<LineItem>,
599//! coupon: Option<String>,
600//! }
601//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
602//! struct LineItem {
603//! sku: String,
604//! quantity: i32,
605//! unit_price_cents: i64,
606//! }
607//! #[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize, AvroSchema)]
608//! enum OrderStatus {
609//! Placed,
610//! Paid,
611//! Shipped,
612//! Cancelled,
613//! }
614//! #[derive(Clone, Serialize, Deserialize, AvroSchema)]
615//! struct OrderSummary {
616//! order_id: String,
617//! item_count: i64,
618//! total_cents: i64,
619//! }
620//!
621//! impl DefaultSerde for Order {
622//! type Serde = SchemaSerde<Order, AvroSerde<Order>>;
623//! }
624//! impl DefaultSerde for OrderSummary {
625//! type Serde = SchemaSerde<OrderSummary, AvroSerde<OrderSummary>>;
626//! }
627//!
628//! set_default_registry(SchemaCache::new(
629//! RegistryClient::new("http://localhost:8081"),
630//! CacheConfig::default(),
631//! ));
632//!
633//! let b = StreamsBuilder::new();
634//! b.stream::<String, Order>(["orders"])
635//! .filter(|_id, o| o.status == OrderStatus::Paid)
636//! .map_values(|o: &Order| OrderSummary {
637//! order_id: o.order_id.clone(),
638//! item_count: i64::try_from(o.lines.len()).unwrap_or(i64::MAX),
639//! total_cents: o
640//! .lines
641//! .iter()
642//! .map(|l| i64::from(l.quantity) * l.unit_price_cents)
643//! .sum(),
644//! })
645//! .to("order-summaries");
646//! let built = b.build("order-billing").unwrap();
647//! assert_eq!(
648//! built.list_sink_topics(),
649//! vec!["order-summaries".to_string()]
650//! );
651//! ```
652//!
653//! ## Punctuation (timers)
654//!
655//! A Processor-API node can register **punctuators** — periodic callbacks — via
656//! [`ProcessorContext::schedule`]`(interval, `[`PunctuationType`]`, `[`Punctuator`]`)`,
657//! typically from `init`. A [`Punctuator`] is a trait object (like [`Processor`])
658//! that on each fire receives a `ProcessorContext` positioned at the scheduling node,
659//! so it may `forward(...)` records downstream and read/write state stores; share
660//! mutable state with the owning processor via `Arc<Mutex<_>>`. `schedule` returns a
661//! [`Cancellable`] (`.cancel()` stops it). Two clocks drive firing:
662//!
663//! - [`PunctuationType::StreamTime`] — driven by the task's observed max record
664//! timestamp (deterministic; advances as records are piped).
665//! - [`PunctuationType::WallClockTime`] — driven by the system clock between polls
666//! (in tests, by [`TopologyTestDriver::advance_wall_clock_time`]).
667//!
668//! Both fire **at most once per driving action**, passing the **current** time
669//! (stream-time / wall-clock) to `punctuate`; a schedule that has fallen more than one
670//! interval behind resyncs ahead rather than replaying every missed boundary. A
671//! stream-time schedule first-fires on the first record; a wall-clock schedule first-
672//! fires one interval after it was scheduled. (Punctuation is invisible in the wire
673//! topology — it is purely runtime behavior; these semantics match the JVM
674//! `TopologyTestDriver`.)
675//!
676//! ```
677//! use async_trait::async_trait;
678//! use crabka_client_streams::{
679//! I64Serde, NodeHandle, Processor, ProcessorContext, PunctuationType, Punctuator, Record,
680//! StringSerde, Topology, TopologyTestDriver,
681//! };
682//! use std::time::Duration;
683//!
684//! // A punctuator that forwards the fire timestamp downstream.
685//! struct Emit;
686//! #[async_trait]
687//! impl Punctuator<String, i64> for Emit {
688//! async fn punctuate(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>, ts: i64) {
689//! ctx.forward(Record::new(None, ts, ts));
690//! }
691//! }
692//! // A processor that schedules `Emit` every 10ms of stream-time (and drops records).
693//! struct Scheduler;
694//! #[async_trait]
695//! impl Processor<String, String, String, i64> for Scheduler {
696//! async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>) {
697//! ctx.schedule(Duration::from_millis(10), PunctuationType::StreamTime, Emit);
698//! }
699//! async fn process(
700//! &mut self,
701//! _ctx: &mut ProcessorContext<'_, '_, String, i64>,
702//! _r: Record<String, String>,
703//! ) {
704//! }
705//! }
706//!
707//! let mut topo = Topology::new();
708//! let src: NodeHandle<String, String> = topo.add_source("src", ["in"]);
709//! let p = topo.add_processor("p", || Scheduler, [&src]);
710//! topo.add_sink("out", "out", [&p]);
711//! let built = topo.build("app").unwrap();
712//!
713//! let mut driver = TopologyTestDriver::new(&built).unwrap();
714//! // Stream-time advances with each record's timestamp; the punctuator fires once per
715//! // crossed 10ms boundary, stamped with the CURRENT stream-time (5 is skipped).
716//! for ts in [0_i64, 5, 10] {
717//! driver.pipe_input(
718//! "in",
719//! (StringSerde, StringSerde),
720//! Some("k".to_string()),
721//! "v".to_string(),
722//! ts,
723//! );
724//! }
725//! assert_eq!(
726//! driver.read_output("out", (StringSerde, I64Serde)),
727//! Some((None, 0_i64))
728//! );
729//! assert_eq!(
730//! driver.read_output("out", (StringSerde, I64Serde)),
731//! Some((None, 10_i64))
732//! );
733//! assert_eq!(driver.read_output("out", (StringSerde, I64Serde)), None);
734//! ```
735//!
736//! ## Running an app (`KafkaStreams`)
737//!
738//! Once built, run a topology against a broker with the managed runtime — it
739//! joins the streams group, fetches its assigned partitions, processes records,
740//! produces to sink topics, and commits offsets (at-least-once):
741//!
742//! ```no_run
743//! use async_trait::async_trait;
744//! use crabka_client_streams::{
745//! KafkaStreams, NodeHandle, Processor, ProcessorContext, Record, Topology,
746//! };
747//!
748//! struct Upper;
749//! #[async_trait]
750//! impl Processor<String, String, String, String> for Upper {
751//! async fn process(
752//! &mut self,
753//! ctx: &mut ProcessorContext<'_, '_, String, String>,
754//! r: Record<String, String>,
755//! ) {
756//! ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
757//! }
758//! }
759//!
760//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
761//! let mut topo = Topology::new();
762//! let src: NodeHandle<String, String> = topo.add_source("src", ["input-topic"]);
763//! let up = topo.add_processor("up", || Upper, [&src]);
764//! topo.add_sink("out", "output-topic", [&up]);
765//! let built = topo.build("my-app")?;
766//!
767//! let mut streams = KafkaStreams::builder()
768//! .bootstrap("localhost:9092")
769//! .application_id("my-app")
770//! .topology(built)
771//! .build()
772//! .await?;
773//! // The app runs in the background until it is closed.
774//! streams.close().await?;
775//! # Ok(())
776//! # }
777//! ```
778//!
779//! ## State stores & backends
780//!
781//! The execution path is **async**: [`Processor::process`](processor::Processor)
782//! is an `async fn`, and a processor reads/writes its connected state store with
783//! `ctx.get_state_store::<K, V>(name).get(&k).await` / `.put(k, v).await`.
784//!
785//! State stores are **pluggable** via a byte-level backend. A
786//! [`KeyValueStore`] is a typed view ([`KeyValueBytesStore`]) over a backend
787//! selected by [`StoreBackend`]: `InMemory` (a `BTreeMap`; the default and the
788//! test backend) or `Turso` (a pure-Rust `SQLite` engine persisting under a state
789//! dir, used by the managed runtime). The backend is a *materialized cache* — the
790//! changelog topic is the source of truth, so on assignment the store is rebuilt
791//! from the changelog (clean-slate replay), and a missing/corrupt local store is
792//! recovered by replay rather than data loss. Select it on the builder:
793//! `KafkaStreams::builder().store_backend(StoreBackend::Turso { state_dir })`.
794//!
795//! ## Interactive Queries
796//!
797//! Read a running instance's local state stores from outside the topology with
798//! [`KafkaStreams::key_value_store`], [`KafkaStreams::window_store`], and
799//! [`KafkaStreams::session_store`]. Each returns a typed, read-only view —
800//! [`ReadOnlyKeyValueStore`] / [`ReadOnlyWindowStore`] / [`ReadOnlySessionStore`]
801//! — whose accessors round-trip through the running supervisor:
802//!
803//! ```no_run
804//! # use crabka_client_streams::{KafkaStreams, StringSerde, I64Serde};
805//! # async fn example(streams: KafkaStreams) -> Result<(), Box<dyn std::error::Error>> {
806//! let counts = streams
807//! .key_value_store("counts", StringSerde, I64Serde)
808//! .await?;
809//! let n: Option<i64> = counts.get(&"alice".to_string()).await?;
810//! let top = counts.range(&"a".to_string(), &"m".to_string()).await?;
811//! let total = counts.approximate_num_entries().await?;
812//! # let _ = (n, top, total);
813//! # Ok(())
814//! # }
815//! ```
816//!
817//! Queries reach only the **local active** stores (a composite read across every
818//! partition this instance owns), matching the JVM default `StoreQueryParameters`.
819//! [`ReadOnlyKeyValueStore`] exposes `get` / `range` (inclusive) / `all` /
820//! `approximate_num_entries`; [`ReadOnlyWindowStore`] exposes `fetch_single` /
821//! `fetch`; [`ReadOnlySessionStore`] exposes `fetch`. Failures surface as
822//! [`StreamsClientError::InteractiveQuery`] wrapping an [`IqError`]:
823//! [`IqError::StoreNotFound`] (no such store assigned here),
824//! [`IqError::WrongStoreKind`] (queried the wrong store kind),
825//! [`IqError::NotRunning`] (instance closed), or
826//! [`IqError::RebalanceInProgress`] (no tasks assigned yet — retry).
827//!
828//! ## Exactly-once (EOS v2)
829//!
830//! The runtime's delivery guarantee is set by
831//! [`ProcessingGuarantee`]: [`AtLeastOnce`](ProcessingGuarantee::AtLeastOnce)
832//! (the default — produce, then commit source offsets; a crash mid-cycle may
833//! replay) or [`ExactlyOnceV2`](ProcessingGuarantee::ExactlyOnceV2) (KIP-447
834//! `exactly_once_v2`). Under EOS-v2 the [`StreamThread`] runs **one Kafka
835//! transaction per commit interval** over a single transactional producer
836//! (`transactional.id = <application.id>-<thread>`): it `begin`s the txn,
837//! produces sink **and** changelog records into it, commits the consumed source
838//! offsets *inside* the same transaction (`send_offsets_to_transaction`), and
839//! `commit`s — so output, changelog, and offsets land atomically. On any error
840//! in the cycle it `abort`s, rewinds the source offsets, and rolls back state
841//! stores by wiping + re-restoring from the **committed** changelog
842//! (`read_committed`). State-store restore under EOS reads `read_committed`, so
843//! aborted changelog writes are never replayed.
844//!
845//! Committed source offsets are surfaced through `OffsetFetch` once the
846//! transaction's COMMIT marker lands, so a restarted instance resumes from the
847//! committed offset (the committed input is processed **exactly once across the
848//! restart** — it is not re-read/double-counted), rebuilding its stores from the
849//! committed changelog.
850//!
851//! [`StreamThread`]: runtime
852//!
853//! ```no_run
854//! use crabka_client_streams::{KafkaStreams, NodeHandle, ProcessingGuarantee, Topology};
855//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
856//! let mut topo = Topology::new();
857//! let src: NodeHandle<String, String> = topo.add_source("src", ["in"]);
858//! topo.add_sink("out", "out", [&src]);
859//! let built = topo.build("my-app")?;
860//!
861//! // Opt into exactly-once: output + changelog + source offsets commit atomically.
862//! let mut streams = KafkaStreams::builder()
863//! .bootstrap("localhost:9092")
864//! .application_id("my-app")
865//! .topology(built)
866//! .processing_guarantee(ProcessingGuarantee::ExactlyOnceV2)
867//! .build()
868//! .await?;
869//! streams.close().await?;
870//! # Ok(())
871//! # }
872//! ```
873//!
874//! ## Versioned tables (KIP-889)
875//!
876//! `builder.table(..., Materialized::as_versioned(name, history_retention_ms))`
877//! materializes a table into a versioned key-value store, so out-of-order
878//! records are recorded as historical versions without clobbering the latest,
879//! and point-in-time reads are available via `get_as_of`.
880#![doc(html_root_url = "https://docs.rs/crabka-client-streams/0.3.5")]
881
882pub mod dsl;
883mod error;
884pub mod membership;
885pub mod processor;
886pub mod runtime;
887pub mod store;
888pub mod streams_app;
889pub mod test_driver;
890pub mod topology;
891
892#[doc(hidden)]
893pub use async_trait::async_trait as __async_trait;
894
895pub use dsl::{
896 BranchedStream, BufferConfig, CogroupedKStream, GlobalKTable, Grouped, JoinWindows, Joined,
897 KGroupedStream, KStream, KTable, Materialized, Repartitioned, SessionWindowedCogroupedStream,
898 SessionWindowedKGroupedStream, SessionWindowedSerde, SessionWindows,
899 SlidingWindowedCogroupedStream, SlidingWindowedKGroupedStream, SlidingWindows, StreamJoined,
900 StreamsBuilder, Suppressed, TimeWindowedCogroupedStream, TimeWindowedKGroupedStream,
901 TimeWindowedSerde, TimeWindows, VersionedConfig, Window, Windowed,
902};
903pub use error::StreamsClientError;
904pub use membership::{
905 SchemaPrewarm, StreamsAssignment, StreamsEvent, StreamsMembership, StreamsStatus,
906 TaskAssignment, TaskOffsetTracker, TopicPartition,
907};
908pub use processor::schema_serde::SchemaSerde;
909pub use processor::{
910 BytesSerde, Cancellable, Consumed, DefaultSerde, FixedKeyProcessor, FixedKeyProcessorContext,
911 FixedKeyProcessorSupplier, FixedKeyRecord, I64Serde, Processor, ProcessorContext,
912 ProcessorError, ProcessorSupplier, Produced, PunctuationType, Punctuator, Record,
913 RecordContext, Serde, SerdeError, StringSerde,
914};
915pub use runtime::eos::ProcessingGuarantee;
916pub use runtime::iq::IqError;
917pub use runtime::iqv2::{
918 FailureReason, KeyQuery, MultiVersionedKeyQuery, Position, PositionBound, Query, QueryResult,
919 RangeQuery, StateQuery, StateQueryRequest, StateQueryResult, VersionedKeyQuery, WindowKeyQuery,
920 WindowRangeQuery,
921};
922pub use runtime::{
923 KafkaStreams, KafkaStreamsState, ReadOnlyKeyValueStore, ReadOnlySessionStore,
924 ReadOnlyWindowStore,
925};
926pub use store::iq::StoreKind;
927pub use store::versioned::VersionedRecord;
928pub use store::{KeyValueBytesStore, KeyValueStore, StateStore, StoreBackend};
929pub use streams_app::StreamsApp;
930pub use test_driver::TopologyTestDriver;
931pub use topology::{BuiltTopology, NodeHandle, Topology, TopologyError};