1use kcl::checkpointer::Checkpointer;
2use kcl::reader::StdinReader;
3use kcl::writer::StdoutWriter;
4use kcl::{run, Processor, Record};
5use serde::Deserialize;
6
7#[derive(Debug, Deserialize)]
8struct MyPayload {
9 event_field: String,
10}
11
12struct MyConsumer;
13
14impl Processor<StdoutWriter, StdinReader> for MyConsumer {
15 fn initialize(&mut self, _shard_id: &str) {}
16
17 fn process_records(
18 &mut self,
19 data: &[Record],
20 checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>,
21 ) {
22 for record in data {
23 match record.json::<MyPayload>() {
24 Ok(data) => println!("{:?}", data.event_field),
25 Err(e) => println!("{:?}", e),
26 }
27 }
28 checkpointer
29 .checkpoint(None, None)
30 .expect("Checkpoint to succeed.");
31 }
32 fn lease_lost(&mut self) {}
33 fn shard_ended(&mut self, checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>) {
34 checkpointer
35 .checkpoint(None, None)
36 .expect("Checkpoint to succeed.");
37 }
38 fn shutdown_requested(&mut self, checkpointer: &mut Checkpointer<StdoutWriter, StdinReader>) {
39 checkpointer
40 .checkpoint(None, None)
41 .expect("Checkpoint to succeed.");
42 }
43}
44
45fn main() {
46 run(&mut MyConsumer {});
47}