scouter_server/kafka/
consumer.rs1use crate::sql::postgres::PostgresClient;
2use crate::sql::schema::DriftRecord;
3use anyhow::*;
4use rdkafka::config::ClientConfig;
5use rdkafka::consumer::Consumer;
6
7use futures::StreamExt;
8use rdkafka::consumer::CommitMode;
9use rdkafka::consumer::StreamConsumer;
10use rdkafka::Message;
11use std::collections::HashMap;
12use std::result::Result::Ok;
13use tracing::error;
14use tracing::info;
15
16pub enum MessageHandler {
19 Postgres(PostgresClient),
20}
21
22impl MessageHandler {
23 pub async fn insert_drift_record(&self, records: &DriftRecord) -> Result<()> {
24 match self {
25 Self::Postgres(client) => {
26 let result = client.insert_drift_record(records).await;
27 match result {
28 Ok(_) => (),
29 Err(e) => {
30 error!("Failed to insert drift record: {:?}", e);
31 }
32 }
33 }
34 }
35
36 Ok(())
37 }
38}
39
40#[allow(clippy::too_many_arguments)]
41#[allow(clippy::unnecessary_unwrap)]
42pub async fn create_kafka_consumer(
43 group_id: String,
44 brokers: String,
45 topics: Vec<String>,
46 username: Option<String>,
47 password: Option<String>,
48 security_protocol: Option<String>,
49 sasl_mechanism: Option<String>,
50 config_overrides: Option<HashMap<&str, &str>>,
51) -> Result<StreamConsumer, anyhow::Error> {
52 info!("Setting up Kafka consumer");
53
54 let mut config = ClientConfig::new();
55
56 config
57 .set("group.id", group_id)
58 .set("bootstrap.servers", brokers)
59 .set("enable.partition.eof", "false")
60 .set("session.timeout.ms", "6000")
61 .set("enable.auto.commit", "true");
62
63 if username.is_some() && password.is_some() {
64 config
65 .set("security.protocol", security_protocol.unwrap())
66 .set("sasl.mechanisms", sasl_mechanism.unwrap())
67 .set("sasl.username", username.unwrap())
68 .set("sasl.password", password.unwrap());
69 }
70
71 if let Some(overrides) = config_overrides {
72 for (key, value) in overrides {
73 config.set(key, value);
74 }
75 }
76
77 let consumer: StreamConsumer = config.create().expect("Consumer creation error");
78
79 let topics = topics.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
80
81 consumer
82 .subscribe(&topics)
83 .expect("Can't subscribe to specified topics");
84
85 Ok(consumer)
86}
87
88pub async fn stream_from_kafka_topic(
89 message_handler: &MessageHandler,
90 consumer: &StreamConsumer,
91) -> Result<(), anyhow::Error> {
92 let mut stream = consumer.stream();
93 let message = stream.next().await;
94
95 match message {
96 Some(Ok(msg)) => {
97 let payload = msg.payload().unwrap();
98 let record: DriftRecord = serde_json::from_slice(payload).unwrap();
99 let inserted = message_handler.insert_drift_record(&record).await;
100 match inserted {
101 Ok(_) => {
102 consumer.commit_message(&msg, CommitMode::Async).unwrap();
103 }
104 Err(e) => {
105 error!("Failed to insert drift record: {:?}", e);
106 }
107 }
108 }
109 Some(Err(e)) => {
110 error!("Failed to receive message: {:?}", e);
111 }
112 None => {
113 error!("No message received");
114 }
115 }
116
117 Ok(())
118}
119
120#[allow(clippy::unnecessary_unwrap)]
140#[allow(clippy::too_many_arguments)]
141pub async fn start_kafka_background_poll(
142 message_handler: MessageHandler,
143 group_id: String,
144 brokers: String,
145 topics: Vec<String>,
146 username: Option<String>,
147 password: Option<String>,
148 security_protocol: Option<String>,
149 sasl_mechanism: Option<String>,
150) -> Result<(), anyhow::Error> {
151 let consumer = create_kafka_consumer(
152 group_id,
153 brokers,
154 topics,
155 username,
156 password,
157 security_protocol,
158 sasl_mechanism,
159 None,
160 )
161 .await
162 .unwrap();
163
164 loop {
165 stream_from_kafka_topic(&message_handler, &consumer).await?;
166 }
167}