scouter_server/kafka/
consumer.rs

1use crate::sql::postgres::PostgresClient;
2use crate::sql::schema::DriftRecord;
3use anyhow::*;
4use rdkafka::config::ClientConfig;
5use rdkafka::consumer::Consumer;
6
7use futures::StreamExt;
8use rdkafka::consumer::CommitMode;
9use rdkafka::consumer::StreamConsumer;
10use rdkafka::Message;
11use std::collections::HashMap;
12use std::result::Result::Ok;
13use tracing::error;
14use tracing::info;
15
16// Get table name constant
17
18pub enum MessageHandler {
19    Postgres(PostgresClient),
20}
21
22impl MessageHandler {
23    pub async fn insert_drift_record(&self, records: &DriftRecord) -> Result<()> {
24        match self {
25            Self::Postgres(client) => {
26                let result = client.insert_drift_record(records).await;
27                match result {
28                    Ok(_) => (),
29                    Err(e) => {
30                        error!("Failed to insert drift record: {:?}", e);
31                    }
32                }
33            }
34        }
35
36        Ok(())
37    }
38}
39
40#[allow(clippy::too_many_arguments)]
41#[allow(clippy::unnecessary_unwrap)]
42pub async fn create_kafka_consumer(
43    group_id: String,
44    brokers: String,
45    topics: Vec<String>,
46    username: Option<String>,
47    password: Option<String>,
48    security_protocol: Option<String>,
49    sasl_mechanism: Option<String>,
50    config_overrides: Option<HashMap<&str, &str>>,
51) -> Result<StreamConsumer, anyhow::Error> {
52    info!("Setting up Kafka consumer");
53
54    let mut config = ClientConfig::new();
55
56    config
57        .set("group.id", group_id)
58        .set("bootstrap.servers", brokers)
59        .set("enable.partition.eof", "false")
60        .set("session.timeout.ms", "6000")
61        .set("enable.auto.commit", "true");
62
63    if username.is_some() && password.is_some() {
64        config
65            .set("security.protocol", security_protocol.unwrap())
66            .set("sasl.mechanisms", sasl_mechanism.unwrap())
67            .set("sasl.username", username.unwrap())
68            .set("sasl.password", password.unwrap());
69    }
70
71    if let Some(overrides) = config_overrides {
72        for (key, value) in overrides {
73            config.set(key, value);
74        }
75    }
76
77    let consumer: StreamConsumer = config.create().expect("Consumer creation error");
78
79    let topics = topics.iter().map(|s| s.as_str()).collect::<Vec<&str>>();
80
81    consumer
82        .subscribe(&topics)
83        .expect("Can't subscribe to specified topics");
84
85    Ok(consumer)
86}
87
88pub async fn stream_from_kafka_topic(
89    message_handler: &MessageHandler,
90    consumer: &StreamConsumer,
91) -> Result<(), anyhow::Error> {
92    let mut stream = consumer.stream();
93    let message = stream.next().await;
94
95    match message {
96        Some(Ok(msg)) => {
97            let payload = msg.payload().unwrap();
98            let record: DriftRecord = serde_json::from_slice(payload).unwrap();
99            let inserted = message_handler.insert_drift_record(&record).await;
100            match inserted {
101                Ok(_) => {
102                    consumer.commit_message(&msg, CommitMode::Async).unwrap();
103                }
104                Err(e) => {
105                    error!("Failed to insert drift record: {:?}", e);
106                }
107            }
108        }
109        Some(Err(e)) => {
110            error!("Failed to receive message: {:?}", e);
111        }
112        None => {
113            error!("No message received");
114        }
115    }
116
117    Ok(())
118}
119
120// Start background task to poll kafka topic
121//
122// This function will poll the kafka topic and insert the records into the database
123// using the provided message handler.
124//
125// # Arguments
126//
127// * `message_handler` - The message handler to process the records
128// * `group_id` - The kafka consumer group id
129// * `brokers` - The kafka brokers
130// * `topics` - The kafka topics to subscribe to
131// * `username` - The kafka username
132// * `password` - The kafka password
133// * `security_protocol` - The kafka security protocol
134// * `sasl_mechanism` - The kafka SASL mechanism
135//
136// # Returns
137//
138// * `Result<(), anyhow::Error>` - The result of the operation
139#[allow(clippy::unnecessary_unwrap)]
140#[allow(clippy::too_many_arguments)]
141pub async fn start_kafka_background_poll(
142    message_handler: MessageHandler,
143    group_id: String,
144    brokers: String,
145    topics: Vec<String>,
146    username: Option<String>,
147    password: Option<String>,
148    security_protocol: Option<String>,
149    sasl_mechanism: Option<String>,
150) -> Result<(), anyhow::Error> {
151    let consumer = create_kafka_consumer(
152        group_id,
153        brokers,
154        topics,
155        username,
156        password,
157        security_protocol,
158        sasl_mechanism,
159        None,
160    )
161    .await
162    .unwrap();
163
164    loop {
165        stream_from_kafka_topic(&message_handler, &consumer).await?;
166    }
167}