crabka-client-streams 0.3.2

KIP-1071 Kafka Streams rebalance-protocol client for Apache Kafka in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
//! Kafka Streams-compatible client runtime for Crabka.
//!
//! `crabka-client-streams` provides three layers that can be used independently:
//!
//! - [`StreamsBuilder`] builds JVM-compatible KStream/KTable topologies for
//!   common application code: map/filter chains, aggregations, joins, windows,
//!   suppression, global tables, and custom Processor-API nodes.
//! - [`Topology`] is the typed Processor API for applications that want explicit
//!   source, processor, sink, and state-store wiring.
//! - [`KafkaStreams`] runs a built topology against a Kafka-compatible broker by
//!   joining a KIP-1071 streams group, processing assigned input partitions,
//!   producing sink records, restoring changelog-backed stores, and serving local
//!   interactive queries.
//!
//! For broker-free tests, [`TopologyTestDriver`] executes the same built topology
//! in process. The driver is the fastest way to exercise business logic and state
//! stores before running with [`KafkaStreams`].
//!
//! ## Quick start
//!
//! ```no_run
//! use std::time::Duration;
//! use crabka_client_streams::{StreamsEvent, StreamsMembership, StringSerde, Topology};
//!
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let mut topo = Topology::new();
//! let src = topo.add_source("src", ["input-topic"], (StringSerde, StringSerde));
//! topo.add_sink("snk", "output-topic", [&src], (StringSerde, StringSerde));
//! let built = topo.build("my-application-id")?;
//!
//! let mut membership = StreamsMembership::builder()
//!     .bootstrap("localhost:9092")
//!     .group_id("my-application-id")
//!     .topology(std::sync::Arc::new(built))
//!     .build()
//!     .await?;
//!
//! loop {
//!     match membership.next_event().await? {
//!         StreamsEvent::Assigned(a) => {
//!             for task in &a.active {
//!                 println!("active task {} → {:?}", task.subtopology_id, task.source_topic_partitions);
//!             }
//!         }
//!         StreamsEvent::NotReady(statuses) => println!("not ready: {statuses:?}"),
//!         StreamsEvent::Fenced => println!("rejoined after fence"),
//!     }
//! }
//! # }
//! ```
//! ## Processor API
//!
//! Define a typed topology, then test it with the broker-free [`TopologyTestDriver`]:
//!
//! ```
//! use async_trait::async_trait;
//! use crabka_client_streams::{Processor, ProcessorContext, Record, StringSerde, Topology, TopologyTestDriver};
//!
//! struct Upper;
//! #[async_trait]
//! impl Processor<String, String, String, String> for Upper {
//!     async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, String, String>, r: Record<String, String>) {
//!         ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
//!     }
//! }
//!
//! let mut topo = Topology::new();
//! let src = topo.add_source("src", ["in"], (StringSerde, StringSerde));
//! let up = topo.add_processor("up", || Upper, [&src]);
//! topo.add_sink("out", "out", [&up], (StringSerde, StringSerde));
//! let built = topo.build("my-app").unwrap();
//!
//! let mut driver = TopologyTestDriver::new(&built).unwrap();
//! driver.pipe_input("in", (StringSerde, StringSerde), Some("k".to_string()), "hello".to_string(), 0);
//! assert_eq!(
//!     driver.read_output("out", (StringSerde, StringSerde)),
//!     Some((Some("k".to_string()), "HELLO".to_string())),
//! );
//! ```
//!
//! Nodes are wired by handle, not by string name, so a mis-typed edge is a
//! **compile error** rather than a `build()`-time failure:
//!
//! ```compile_fail
//! use crabka_client_streams::{I64Serde, StringSerde, Topology};
//!
//! let mut topo = Topology::new();
//! // `src` produces Record<String, String>:
//! let src = topo.add_source("src", ["in"], (StringSerde, StringSerde));
//! // but this sink expects Record<String, i64> — won't compile:
//! topo.add_sink("out", "out", [&src], (StringSerde, I64Serde));
//! ```
//!
//! ## State stores
//!
//! Processors can persist and restore keyed state via a named [`KeyValueStore`].
//! The store is attached to the topology with `add_state_store`, and accessed
//! inside `process` via [`ProcessorContext::get_state_store`].
//!
//! ```
//! use async_trait::async_trait;
//! use crabka_client_streams::{
//!     I64Serde, Processor, ProcessorContext, Record, StringSerde, Topology,
//!     TopologyTestDriver,
//! };
//!
//! struct Counter;
//! #[async_trait]
//! impl Processor<String, String, String, i64> for Counter {
//!     async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>, r: Record<String, String>) {
//!         let n = {
//!             let s = ctx.get_state_store::<String, i64>("counts").unwrap();
//!             let n = s.get(&r.value).await.unwrap_or(0) + 1;
//!             s.put(r.value.clone(), n).await;
//!             n
//!         };
//!         ctx.forward(Record::new(Some(r.value), n, r.timestamp));
//!     }
//! }
//!
//! let mut topo = Topology::new();
//! let src = topo.add_source("src", ["in"], (StringSerde, StringSerde));
//! let c = topo.add_processor("c", || Counter, [&src]);
//! topo.add_state_store("counts", StringSerde, I64Serde, [c.name()]);
//! topo.add_sink("out", "out", [&c], (StringSerde, I64Serde));
//! let built = topo.build("app").unwrap();
//!
//! let mut driver = TopologyTestDriver::new(&built).unwrap();
//! driver.pipe_input("in", (StringSerde, StringSerde), None, "a".to_string(), 0);
//! driver.pipe_input("in", (StringSerde, StringSerde), None, "a".to_string(), 1);
//! assert_eq!(
//!     driver.read_output("out", (StringSerde, I64Serde)),
//!     Some((Some("a".to_string()), 1_i64)),
//! );
//! assert_eq!(
//!     driver.read_output("out", (StringSerde, I64Serde)),
//!     Some((Some("a".to_string()), 2_i64)),
//! );
//! assert_eq!(driver.store_get::<String, i64>("counts", &"a".to_string()), Some(2_i64));
//! ```
//!
//! ## DSL (KStream/KTable)
//!
//! [`StreamsBuilder`] is the high-level DSL entry point. It wires a topology
//! from source streams through stateless transforms, aggregations, and sinks
//! without writing explicit [`Processor`] implementations. The resulting
//! [`BuiltTopology`] is interchangeable with the Processor-API variant — run it
//! with [`TopologyTestDriver`] for broker-free testing or [`KafkaStreams`] for
//! production.
//!
//! A [`KTable`] is internally a *change stream*: each record carries a
//! `Change { old_value, new_value }` and `filter` emits tombstones (a row whose
//! key stops matching is deleted downstream with `new_value = None`).
//! [`KStream::to_table`] materializes a stream into a [`KTable`] backed by a
//! named [`Materialized`] store. [`KTable::to_stream`] forwards update records
//! and drops tombstones from the output stream.
//!
//! [`KStream::join_table`] and [`KStream::left_join_table`] join a stream against
//! a **materialized** `KTable`: the stream side drives, and for each record the
//! table store is looked up by key. An inner join emits only when the table has a
//! matching entry; a left join always emits (with `None` as the table value when
//! absent). The stream must be **copartitioned** with the table (same key serde
//! and partition count); a key-changing stream must be `.repartition(..)`-ed
//! before joining — the join itself inserts no implicit repartition. (The plain
//! [`KStream::join`]/[`KStream::left_join`] names are the windowed *stream-stream*
//! join below — Rust can't overload by argument type as the JVM does.)
//!
//! [`KTable::join`], [`KTable::left_join`], and [`KTable::outer_join`] join two
//! **materialized** `KTables`. Unlike the stream-table join, a change on *either*
//! side recomputes the join: the changed side re-reads the other side's current
//! value from its store and forwards a `Change` (a tombstone when the joined row
//! stops existing). Inner emits only when both sides hold a value; left emits
//! whenever the left side is present; outer emits whenever either side is. The two
//! source topics are declared as a **copartition group**, and the result is an
//! unmaterialized `KTable` (no result store/changelog — materialize a downstream op
//! to persist it).
//!
//! ## Foreign-key joins
//!
//! [`KTable::join_on_foreign_key`] and [`KTable::left_join_on_foreign_key`]
//! (KIP-213) join two `KTable`s on a **foreign key** rather than the primary key:
//! for each left row, an `fk_extractor(&leftValue)` selects the foreign key, which
//! looks up a row in the right table. The relationship is **many-to-one** — many
//! left rows can reference the same right row, and a change on *either* side
//! re-evaluates every affected pair: a left-value change re-selects the foreign
//! key, and a right-row change re-emits for every left row currently subscribed to
//! that foreign key. **Inner** emits `joiner(&left, &right)` only when the foreign
//! row exists (a foreign key with no match retracts with a tombstone); **left**
//! emits for every left row, passing `None` for the foreign value on a miss.
//!
//! Both input tables must be **materialized source tables** — built with
//! [`StreamsBuilder::table`] (the join reads each side's store and serdes). The
//! result is an **unmaterialized** `KTable` (no result store/changelog; materialize
//! a downstream op to persist it). Because the foreign key differs from the primary
//! key, the join cannot be copartitioned directly; it lowers to the KIP-213
//! two-subtopology graph — a *subscription registration* repartition topic (keyed
//! by foreign key), a *subscription response* repartition topic (keyed back by
//! primary key), and a subscription state store that tracks which primary keys
//! subscribe to each foreign key — all created and copartitioned automatically.
//!
//! ```no_run
//! use crabka_client_streams::{StreamsBuilder, StringSerde};
//!
//! let builder = StreamsBuilder::new();
//! // `a`: primaryKey -> foreignKey ("A"); `b`: foreignKey -> value ("X").
//! let a = builder.table::<String, String>("a", "sa");
//! let b = builder.table::<String, String>("b", "sb");
//! a.join_on_foreign_key(
//!     &b,
//!     |left: &String| left.clone(),                       // foreign-key extractor
//!     |left: &String, right: &String| format!("{left}{right}"), // joiner -> "AX"
//!     StringSerde,                                         // foreign-key serde
//! )
//! .to_stream()
//! .to("out");
//! drop(a);
//! drop(b);
//! let topology = builder.build("fk-app").unwrap();
//! # let _ = topology;
//! ```
//!
//! [`KGroupedStream::windowed_by`] turns a grouped stream into time-windowed
//! aggregations: `windowed_by(TimeWindows::of_size(..))` then `count`/`reduce`/
//! `aggregate` yields a [`KTable`]`<`[`Windowed`]`<K>, V>`. [`TimeWindows`] are
//! tumbling (`of_size`) or hopping (`.advance_by(..)`); each record is aggregated
//! into every window it falls into, and a result is emitted on **every update**.
//! Add [`KTable::suppress`] with [`Suppressed::until_window_closes`] when the
//! application wants one final result after the window closes. The windowed store is a
//! [`Window`]-keyed store over the same pluggable backend, with a `compact,delete`
//! changelog (`retention.ms = size + grace + 1 day`). Read the windowed output
//! with [`TimeWindowedSerde`] (the key carries the window start).
//!
//! [`KGroupedStream::windowed_by_session`] groups records into data-driven
//! **session windows**: records for a key form one session `[start, end]` while
//! they stay within an inactivity [`SessionWindows`] gap. Terminal `count` /
//! `reduce` / `aggregate` (the last taking a session merger) yield a
//! [`KTable`]`<`[`Windowed`]`<K>, V>`. Each record merges every session within the
//! gap into one `[minStart, maxEnd]` session — emitting a tombstone for each
//! merged-away session and the new merged session (KIP session semantics,
//! emit-on-update). The session store keys by `key‖end‖start` (a third typed store
//! over the pluggable backend); read the output with [`SessionWindowedSerde`].
//!
//! [`KTable::suppress`]`(`[`Suppressed`]`::until_window_closes(`[`BufferConfig`]`::unbounded()))`
//! turns a windowed table's emit-on-update change-stream into **final results**: it
//! buffers each window's updates and forwards the window's final value exactly once,
//! when stream-time passes `window.end + grace` (the grace comes from the upstream
//! windowed/session aggregation). [`Suppressed::until_time_limit`] is the
//! rate-limiter variant for *any* table — it emits at most one update per key per
//! wait (stream-time), a newer record resetting the timer.
//!
//! The buffer is a **registered, durable state store** (a time-ordered
//! `SuppressBytesStore` keyed by the serialized record key). With logging on (the
//! default) it writes a **JVM-byte-exact** changelog — `BufferValue` +
//! `ProcessorRecordContext` value, a plain `cleanup.policy=compact` topic
//! `app-KTABLE-SUPPRESS-STATE-STORE-<n>-changelog` — and restores the buffered
//! records on restart via the same machinery as every other store, so windows that
//! were still buffered re-emit on close after a restart. [`Suppressed::with_logging_disabled`]
//! keeps the buffer in memory only (no changelog topic). The serdes reach the store
//! from the producing op (the windowed/session aggregation or [`StreamsBuilder::table`]).
//!
//! The buffer is bounded by [`BufferConfig`]: [`BufferConfig::unbounded`]`().with_max_records(n)`
//! / [`BufferConfig::with_max_bytes`]`(n)` cap it (bytes = serialized key + value
//! summed); exceeding a cap either shuts the task down (`shutDownWhenFull`, the
//! `until_window_closes` default) or — with `BufferConfig::max_records(n)` /
//! [`BufferConfig::max_bytes`] (eager) / [`BufferConfig::emit_early_when_full`] —
//! evicts + emits the oldest buffered record (`emitEarlyWhenFull`).
//!
//! [`KStream::join`], [`KStream::left_join`], and [`KStream::outer_join`] are the
//! windowed **stream-stream** joins: two streams join over a [`JoinWindows`] time
//! window, configured with [`StreamJoined`] serdes. Each side buffers its records
//! in its own `retainDuplicates` window store (so two records at the same time
//! both survive); a record from one side joins every record on the other side
//! within `[t - before, t + after]`, emitting `joiner(a, b)` at `max(ts)`. The two
//! window-store changelogs use `cleanup.policy=delete` (`retention.ms = before +
//! after + grace + 1 day`), and the two source topics form a copartition group. An
//! inner join emits only on a match; **left**/**outer** additionally emit the
//! null-padded result for a record that finds no match, once its window has closed
//! (KIP-633 stream-time-driven emission — there is no wall-clock throttle). Left/
//! outer buffer the as-yet-unmatched records in a shared `KSTREAM-OUTERSHARED-`
//! KV store (a compact changelog) and rename their per-side processors to
//! `KSTREAM-OUTERTHIS-`/`KSTREAM-OUTEROTHER-` to match the JVM. As with the other
//! joins, a key-changing stream must `.repartition(..)` before joining.
//!
//! [`StreamsBuilder::global_table`] sources a [`GlobalKTable`]: a **fully-replicated**
//! lookup table. Every application instance reads *all* partitions of the source
//! topic into one shared global store, so the source topic itself is the truth —
//! there is **no copartitioning, no repartition, and no changelog** (the global
//! store is rebuilt from the source on startup). The store is *invisible in the
//! wire topology* (no subtopology of its own), though its global source node still
//! consumes a node-group index during grouping (so declaring `global_table` before
//! `stream` shifts the stream subtopology id). [`KStream::join_global`] /
//! [`KStream::left_join_global`] join a stream to it by a **per-record-derived
//! key** — `key_mapper(&streamKey, &streamValue)` selects the global key (which may
//! differ from the stream key) — and emit `joiner(&streamValue, &globalValue)` keyed
//! by the *stream* key. An inner `join_global` skips a record on a store miss; a
//! `left_join_global` always emits, passing `None` for the global side. Because the
//! store is fully replicated, any record can look up any key on every instance.
//! The runtime's global consumer **bootstraps** the store — draining every partition
//! of the source topic to end-of-log — *before* any task begins processing, so the
//! first joined record already sees the complete global table.
//!
//! [`KStream::process`] and [`KStream::process_values`] (KIP-820) drop a custom
//! Processor-API node into a DSL pipeline: a user-written [`Processor`] (for
//! `process`) or [`FixedKeyProcessor`] (for `process_values`) that reads and writes
//! state stores connected by name. Register the store first with
//! [`StreamsBuilder::add_state_store`] (a compact-changelog [`KeyValueStore`]), then
//! pass its name to the `process`/`process_values` call that uses it — the named
//! store is attached to that node and its `app-<store>-changelog` topic appears in
//! the wire. `process` may rewrite the record key, so its result is
//! **key-changing**: a downstream `group_by_key`/join inserts a repartition.
//! `process_values` is **fixed-key** — it can change the value but not the key — so
//! it carries the upstream key lineage and forces **no** repartition. That guarantee
//! is structural: a [`FixedKeyProcessor`] only ever receives and forwards a
//! [`FixedKeyRecord`], whose key is fixed from the input and preserved through
//! [`FixedKeyRecord::with_value`]; the context's only `forward` re-attaches that key,
//! so the processor cannot emit a different one. (An `add_state_store` store that no
//! `process`/`process_values` connects is simply never instantiated — no changelog,
//! no runtime store.)
//!
//! ### Enriching a stream with a fully replicated table
//!
//! `GlobalKTable` is useful for reference data such as customer profiles,
//! product catalogs, or fraud watchlists where every app instance should be able
//! to look up any key without repartitioning the stream:
//!
//! ```
//! use crabka_client_streams::{StreamsBuilder, StringSerde};
//!
//! let b = StreamsBuilder::new();
//! let customers = b.global_table::<String, String>("customers", "customers-by-id");
//!
//! b.stream::<String, String>(["orders"])
//! .left_join_global(
//!     &customers,
//!     |_order_id, order| order.split(':').next().unwrap_or("").to_string(),
//!     |order, customer| format!("{order}|customer={}", customer.map_or("unknown", |v| v)),
//! )
//! .to("enriched-orders");
//!
//! drop(customers);
//! let built = b.build("orders-enricher").unwrap();
//! assert_eq!(built.list_source_topics(), vec!["orders".to_string()]);
//! ```
//!
//! ### Final windowed counts
//!
//! Windowed aggregations emit on every update by default. Add `suppress` when
//! downstream systems should receive only the final value after the window grace
//! has elapsed:
//!
//! ```
//! use crabka_client_streams::{
//!     BufferConfig, StreamsBuilder, Suppressed, TimeWindows,
//! };
//!
//! let b = StreamsBuilder::new();
//! b.stream::<String, String>(["clicks"])
//!     .group_by_key()
//!     .windowed_by(TimeWindows::of_size(60_000).grace(10_000))
//!     .count("click-counts")
//!     .suppress(Suppressed::until_window_closes(BufferConfig::unbounded()))
//!     .to_stream()
//!     .to("click-counts-final");
//!
//! let built = b.build("click-analytics").unwrap();
//! assert_eq!(built.list_sink_topics(), vec!["click-counts-final".to_string()]);
//! ```
//!
//! ```
//! use crabka_client_streams::{
//!     I64Serde, StreamsBuilder, StringSerde, TopologyTestDriver,
//! };
//!
//! // Build a word-count topology: group by key, count, forward to "out".
//! let b = StreamsBuilder::new();
//! b.stream::<String, String>(["in"])
//!     .group_by_key()
//!     .count("counts")
//!     .to_stream()
//!     .to("out");
//! let built = b.build("word-count").unwrap();
//!
//! // Drive it broker-free with TopologyTestDriver.
//! let mut driver = TopologyTestDriver::new(&built).unwrap();
//! for word in ["a", "a", "b"] {
//!     driver.pipe_input(
//!         "in",
//!         (StringSerde, StringSerde),
//!         Some(word.to_string()),
//!         word.to_string(),
//!         0,
//!     );
//! }
//!
//! // The stream output carries the running count per key.
//! assert_eq!(
//!     driver.read_output("out", (StringSerde, I64Serde)),
//!     Some((Some("a".to_string()), 1)),
//! );
//! assert_eq!(
//!     driver.read_output("out", (StringSerde, I64Serde)),
//!     Some((Some("a".to_string()), 2)),
//! );
//! assert_eq!(
//!     driver.read_output("out", (StringSerde, I64Serde)),
//!     Some((Some("b".to_string()), 1)),
//! );
//!
//! // The materialized store holds the final count per key.
//! assert_eq!(driver.store_get::<String, i64>("counts", &"a".to_string()), Some(2));
//! assert_eq!(driver.store_get::<String, i64>("counts", &"b".to_string()), Some(1));
//! ```
//!
//! ## Punctuation (timers)
//!
//! A Processor-API node can register **punctuators** — periodic callbacks — via
//! [`ProcessorContext::schedule`]`(interval, `[`PunctuationType`]`, `[`Punctuator`]`)`,
//! typically from `init`. A [`Punctuator`] is a trait object (like [`Processor`])
//! that on each fire receives a `ProcessorContext` positioned at the scheduling node,
//! so it may `forward(...)` records downstream and read/write state stores; share
//! mutable state with the owning processor via `Arc<Mutex<_>>`. `schedule` returns a
//! [`Cancellable`] (`.cancel()` stops it). Two clocks drive firing:
//!
//! - [`PunctuationType::StreamTime`] — driven by the task's observed max record
//!   timestamp (deterministic; advances as records are piped).
//! - [`PunctuationType::WallClockTime`] — driven by the system clock between polls
//!   (in tests, by [`TopologyTestDriver::advance_wall_clock_time`]).
//!
//! Both fire **at most once per driving action**, passing the **current** time
//! (stream-time / wall-clock) to `punctuate`; a schedule that has fallen more than one
//! interval behind resyncs ahead rather than replaying every missed boundary. A
//! stream-time schedule first-fires on the first record; a wall-clock schedule first-
//! fires one interval after it was scheduled. (Punctuation is invisible in the wire
//! topology — it is purely runtime behavior; these semantics match the JVM
//! `TopologyTestDriver`.)
//!
//! ```
//! use async_trait::async_trait;
//! use std::time::Duration;
//! use crabka_client_streams::{
//!     I64Serde, Processor, ProcessorContext, PunctuationType, Punctuator,
//!     Record, StringSerde, Topology, TopologyTestDriver,
//! };
//!
//! // A punctuator that forwards the fire timestamp downstream.
//! struct Emit;
//! #[async_trait]
//! impl Punctuator<String, i64> for Emit {
//!     async fn punctuate(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>, ts: i64) {
//!         ctx.forward(Record::new(None, ts, ts));
//!     }
//! }
//! // A processor that schedules `Emit` every 10ms of stream-time (and drops records).
//! struct Scheduler;
//! #[async_trait]
//! impl Processor<String, String, String, i64> for Scheduler {
//!     async fn init(&mut self, ctx: &mut ProcessorContext<'_, '_, String, i64>) {
//!         ctx.schedule(Duration::from_millis(10), PunctuationType::StreamTime, Emit);
//!     }
//!     async fn process(&mut self, _ctx: &mut ProcessorContext<'_, '_, String, i64>, _r: Record<String, String>) {}
//! }
//!
//! let mut topo = Topology::new();
//! let src = topo.add_source("src", ["in"], (StringSerde, StringSerde));
//! let p = topo.add_processor("p", || Scheduler, [&src]);
//! topo.add_sink("out", "out", [&p], (StringSerde, I64Serde));
//! let built = topo.build("app").unwrap();
//!
//! let mut driver = TopologyTestDriver::new(&built).unwrap();
//! // Stream-time advances with each record's timestamp; the punctuator fires once per
//! // crossed 10ms boundary, stamped with the CURRENT stream-time (5 is skipped).
//! for ts in [0_i64, 5, 10] {
//!     driver.pipe_input("in", (StringSerde, StringSerde), Some("k".to_string()), "v".to_string(), ts);
//! }
//! assert_eq!(driver.read_output("out", (StringSerde, I64Serde)), Some((None, 0_i64)));
//! assert_eq!(driver.read_output("out", (StringSerde, I64Serde)), Some((None, 10_i64)));
//! assert_eq!(driver.read_output("out", (StringSerde, I64Serde)), None);
//! ```
//!
//! ## Running an app (`KafkaStreams`)
//!
//! Once built, run a topology against a broker with the managed runtime — it
//! joins the streams group, fetches its assigned partitions, processes records,
//! produces to sink topics, and commits offsets (at-least-once):
//!
//! ```no_run
//! use async_trait::async_trait;
//! use crabka_client_streams::{KafkaStreams, Processor, ProcessorContext, Record, StringSerde, Topology};
//!
//! struct Upper;
//! #[async_trait]
//! impl Processor<String, String, String, String> for Upper {
//!     async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, String, String>, r: Record<String, String>) {
//!         ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
//!     }
//! }
//!
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let mut topo = Topology::new();
//! let src = topo.add_source("src", ["input-topic"], (StringSerde, StringSerde));
//! let up = topo.add_processor("up", || Upper, [&src]);
//! topo.add_sink("out", "output-topic", [&up], (StringSerde, StringSerde));
//! let built = topo.build("my-app")?;
//!
//! let mut streams = KafkaStreams::builder()
//!     .bootstrap("localhost:9092")
//!     .application_id("my-app")
//!     .topology(built)
//!     .build()
//!     .await?;
//! // The app runs in the background until it is closed.
//! streams.close().await?;
//! # Ok(())
//! # }
//! ```
//!
//! ## State stores & backends
//!
//! The execution path is **async**: [`Processor::process`](processor::Processor)
//! is an `async fn`, and a processor reads/writes its connected state store with
//! `ctx.get_state_store::<K, V>(name).get(&k).await` / `.put(k, v).await`.
//!
//! State stores are **pluggable** via a byte-level backend. A
//! [`KeyValueStore`] is a typed view ([`KeyValueBytesStore`]) over a backend
//! selected by [`StoreBackend`]: `InMemory` (a `BTreeMap`; the default and the
//! test backend) or `Turso` (a pure-Rust `SQLite` engine persisting under a state
//! dir, used by the managed runtime). The backend is a *materialized cache* — the
//! changelog topic is the source of truth, so on assignment the store is rebuilt
//! from the changelog (clean-slate replay), and a missing/corrupt local store is
//! recovered by replay rather than data loss. Select it on the builder:
//! `KafkaStreams::builder().store_backend(StoreBackend::Turso { state_dir })`.
//!
//! ## Interactive Queries
//!
//! Read a running instance's local state stores from outside the topology with
//! [`KafkaStreams::key_value_store`], [`KafkaStreams::window_store`], and
//! [`KafkaStreams::session_store`]. Each returns a typed, read-only view —
//! [`ReadOnlyKeyValueStore`] / [`ReadOnlyWindowStore`] / [`ReadOnlySessionStore`]
//! — whose accessors round-trip through the running supervisor:
//!
//! ```no_run
//! # use crabka_client_streams::{KafkaStreams, StringSerde, I64Serde};
//! # async fn example(streams: KafkaStreams) -> Result<(), Box<dyn std::error::Error>> {
//! let counts = streams.key_value_store("counts", StringSerde, I64Serde).await?;
//! let n: Option<i64> = counts.get(&"alice".to_string()).await?;
//! let top = counts.range(&"a".to_string(), &"m".to_string()).await?;
//! let total = counts.approximate_num_entries().await?;
//! # let _ = (n, top, total);
//! # Ok(())
//! # }
//! ```
//!
//! Queries reach only the **local active** stores (a composite read across every
//! partition this instance owns), matching the JVM default `StoreQueryParameters`.
//! [`ReadOnlyKeyValueStore`] exposes `get` / `range` (inclusive) / `all` /
//! `approximate_num_entries`; [`ReadOnlyWindowStore`] exposes `fetch_single` /
//! `fetch`; [`ReadOnlySessionStore`] exposes `fetch`. Failures surface as
//! [`StreamsClientError::InteractiveQuery`] wrapping an [`IqError`]:
//! [`IqError::StoreNotFound`] (no such store assigned here),
//! [`IqError::WrongStoreKind`] (queried the wrong store kind),
//! [`IqError::NotRunning`] (instance closed), or
//! [`IqError::RebalanceInProgress`] (no tasks assigned yet — retry).
//!
//! ## Exactly-once (EOS v2)
//!
//! The runtime's delivery guarantee is set by
//! [`ProcessingGuarantee`]: [`AtLeastOnce`](ProcessingGuarantee::AtLeastOnce)
//! (the default — produce, then commit source offsets; a crash mid-cycle may
//! replay) or [`ExactlyOnceV2`](ProcessingGuarantee::ExactlyOnceV2) (KIP-447
//! `exactly_once_v2`). Under EOS-v2 the [`StreamThread`] runs **one Kafka
//! transaction per commit interval** over a single transactional producer
//! (`transactional.id = <application.id>-<thread>`): it `begin`s the txn,
//! produces sink **and** changelog records into it, commits the consumed source
//! offsets *inside* the same transaction (`send_offsets_to_transaction`), and
//! `commit`s — so output, changelog, and offsets land atomically. On any error
//! in the cycle it `abort`s, rewinds the source offsets, and rolls back state
//! stores by wiping + re-restoring from the **committed** changelog
//! (`read_committed`). State-store restore under EOS reads `read_committed`, so
//! aborted changelog writes are never replayed.
//!
//! Committed source offsets are surfaced through `OffsetFetch` once the
//! transaction's COMMIT marker lands, so a restarted instance resumes from the
//! committed offset (the committed input is processed **exactly once across the
//! restart** — it is not re-read/double-counted), rebuilding its stores from the
//! committed changelog.
//!
//! [`StreamThread`]: runtime
//!
//! ```no_run
//! use crabka_client_streams::{
//!     KafkaStreams, ProcessingGuarantee, StringSerde, Topology,
//! };
//! # async fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let mut topo = Topology::new();
//! let src = topo.add_source("src", ["in"], (StringSerde, StringSerde));
//! topo.add_sink("out", "out", [&src], (StringSerde, StringSerde));
//! let built = topo.build("my-app")?;
//!
//! // Opt into exactly-once: output + changelog + source offsets commit atomically.
//! let mut streams = KafkaStreams::builder()
//!     .bootstrap("localhost:9092")
//!     .application_id("my-app")
//!     .topology(built)
//!     .processing_guarantee(ProcessingGuarantee::ExactlyOnceV2)
//!     .build()
//!     .await?;
//! streams.close().await?;
//! # Ok(())
//! # }
//! ```
#![doc(html_root_url = "https://docs.rs/crabka-client-streams/0.3.2")]

pub mod dsl;
mod error;
pub mod membership;
pub mod processor;
pub mod runtime;
pub mod store;
pub mod test_driver;
pub mod topology;

pub use dsl::{
    BranchedStream, BufferConfig, GlobalKTable, Grouped, JoinWindows, KGroupedStream, KStream,
    KTable, Materialized, Repartitioned, SessionWindowedKGroupedStream, SessionWindowedSerde,
    SessionWindows, StreamJoined, StreamsBuilder, Suppressed, TimeWindowedKGroupedStream,
    TimeWindowedSerde, TimeWindows, Window, Windowed,
};
pub use error::StreamsClientError;
pub use membership::{
    StreamsAssignment, StreamsEvent, StreamsMembership, StreamsStatus, TaskAssignment,
    TaskOffsetTracker, TopicPartition,
};
pub use processor::{
    BytesSerde, Cancellable, Consumed, DefaultSerde, FixedKeyProcessor, FixedKeyProcessorContext,
    FixedKeyProcessorSupplier, FixedKeyRecord, I64Serde, Processor, ProcessorContext,
    ProcessorError, ProcessorSupplier, Produced, PunctuationType, Punctuator, Record,
    RecordContext, Serde, SerdeError, StringSerde,
};
pub use runtime::eos::ProcessingGuarantee;
pub use runtime::iq::IqError;
pub use runtime::{
    KafkaStreams, KafkaStreamsState, ReadOnlyKeyValueStore, ReadOnlySessionStore,
    ReadOnlyWindowStore,
};
pub use store::iq::StoreKind;
pub use store::{KeyValueBytesStore, KeyValueStore, StateStore, StoreBackend};
pub use test_driver::TopologyTestDriver;
pub use topology::{BuiltTopology, NodeHandle, Topology, TopologyError};