example_consumer/
main.rs

1use kcl::checkpointer::Checkpointer;
2use kcl::reader::StdinReader;
3use kcl::writer::StdoutWriter;
4use kcl::{run, Processor, Record};
5use serde::Deserialize;
6
7#[derive(Debug, Deserialize)]
8struct MyPayload {
9    event_field: String,
10}
11
12struct MyConsumer;
13
14impl Processor<StdoutWriter, StdinReader> for MyConsumer {
15    fn initialize(&mut self, _shard_id: &str) {}
16
17    fn process_records(
18        &mut self,
19        data: &[Record],
20        checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>,
21    ) {
22        for record in data {
23            match record.json::<MyPayload>() {
24                Ok(data) => println!("{:?}", data.event_field),
25                Err(e) => println!("{:?}", e),
26            }
27        }
28        checkpointer
29            .checkpoint(None, None)
30            .expect("Checkpoint to succeed.");
31    }
32    fn lease_lost(&mut self) {}
33    fn shard_ended(&mut self, checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>) {
34        checkpointer
35            .checkpoint(None, None)
36            .expect("Checkpoint to succeed.");
37    }
38    fn shutdown_requested(&mut self, checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>) {
39        checkpointer
40            .checkpoint(None, None)
41            .expect("Checkpoint to succeed.");
42    }
43}
44
45fn main() {
46    run(&mut MyConsumer {});
47}