yozefu_app/
app.rs

1//! This app is both a kafka consumer and a kafka admin client.
2use lib::{
3    ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicConfig, TopicDetail,
4    kafka::SchemaRegistryClient, search::offset::FromOffset,
5};
6use rdkafka::{
7    Offset, TopicPartitionList,
8    consumer::{BaseConsumer, Consumer, StreamConsumer},
9};
10use thousands::Separable;
11use tracing::{info, warn};
12
13use std::{collections::HashSet, fs, time::Duration};
14
15use itertools::Itertools;
16
17use crate::{
18    AdminClient,
19    configuration::{Configuration, ConsumerConfig, InternalConfig, YozefuConfig},
20    search::{Search, ValidSearchQuery},
21};
22
23/// Struct exposing different functions for consuming kafka records.
24#[derive(Debug, Clone)]
25pub struct App {
26    pub cluster: String,
27    pub config: InternalConfig,
28    pub search_query: ValidSearchQuery,
29    //pub output_file: PathBuf,
30}
31
32impl App {
33    pub fn new(
34        cluster: String,
35        config: InternalConfig,
36        search_query: ValidSearchQuery,
37        //     output_file: PathBuf,
38    ) -> Self {
39        Self {
40            cluster,
41            config,
42            search_query,
43            //    output_file,
44        }
45    }
46
47    pub fn schema_registry(&self) -> Option<SchemaRegistryClient> {
48        match self.config.schema_registry_config_of(&self.cluster) {
49            Some(config) => Some(SchemaRegistryClient::new(config.url, &config.headers)),
50            None => None,
51        }
52    }
53
54    /// Create a kafka consumer
55    pub fn create_consumer(&self, topics: &Vec<String>) -> Result<StreamConsumer, Error> {
56        let offset = self.search_query.offset().unwrap_or(FromOffset::End);
57        match offset {
58            FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning),
59            FromOffset::End => self.assign_partitions(topics, Offset::End),
60            FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)),
61            FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)),
62            FromOffset::Timestamp(timestamp) => {
63                let consumer: StreamConsumer = self.config.create_kafka_consumer()?;
64                let mut tp = TopicPartitionList::new();
65                for t in topics {
66                    let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?;
67                    for m in metadata.topics() {
68                        for p in m.partitions() {
69                            tp.add_partition(m.name(), p.id());
70                        }
71                    }
72                }
73                tp.set_all_offsets(Offset::Offset(timestamp))?;
74                let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?;
75                consumer.assign(&tt)?;
76                Ok(consumer)
77            }
78        }
79    }
80
81    pub fn consumer_config(&self) -> ConsumerConfig {
82        self.config.consumer_config(&self.cluster)
83    }
84
85    /// Exports a given kafka record to a file.
86    /// The Name of the file is automatically generated at the runtime
87    pub fn export_record(&self, record: &KafkaRecord) -> Result<(), Error> {
88        let output_file = self.config.output_file();
89        fs::create_dir_all(output_file.parent().unwrap())?;
90        let content = fs::read_to_string(output_file).unwrap_or("[]".to_string());
91        let mut exported_records: Vec<ExportedKafkaRecord> = serde_json::from_str(&content)?;
92
93        let mut exported_record_kafka: ExportedKafkaRecord = record.into();
94        exported_record_kafka.set_search_query(self.search_query.query());
95        exported_records.push(exported_record_kafka);
96        exported_records.sort_by(|a, b| {
97            a.record
98                .timestamp
99                .cmp(&b.record.timestamp)
100                .then(a.record.offset.cmp(&b.record.offset))
101        });
102        exported_records.dedup();
103        for i in 1..exported_records.len() {
104            let first_ts = exported_records.first().unwrap().record.timestamp;
105            let previous_ts = exported_records.get(i - 1).unwrap().record.timestamp;
106            let current = exported_records.get_mut(i).unwrap();
107            current.compute_deltas_ms(first_ts, previous_ts);
108        }
109
110        fs::write(
111            output_file,
112            serde_json::to_string_pretty(&exported_records)?,
113        )?;
114        info!(
115            "A record has been exported into file '{}'",
116            output_file.display()
117        );
118        Ok(())
119    }
120
121    /// Calculates an estimate of the number of records that are going to be read.
122    /// This function is used to render a progress bar.
123    pub fn estimate_number_of_records_to_read(
124        &self,
125        topic_partition_list: &TopicPartitionList,
126    ) -> Result<i64, Error> {
127        let client: StreamConsumer = self.create_assigned_consumer()?;
128        let mut count = 0;
129        for t in topic_partition_list.elements() {
130            // this function call be very slow
131            let watermarks: (i64, i64) =
132                match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) {
133                    Ok(i) => i,
134                    Err(e) => {
135                        warn!(
136                            "I was not able to fetch watermarks of topic '{}', partition {}: {}",
137                            t.partition(),
138                            t.topic(),
139                            e
140                        );
141                        (0, 0)
142                    }
143                };
144            count += match t.offset() {
145                Offset::Beginning => watermarks.1 - watermarks.0,
146                Offset::End => 0,
147                Offset::Stored => 1,
148                Offset::Invalid => 1,
149                Offset::Offset(o) => watermarks.1 - o,
150                Offset::OffsetTail(o) => o,
151            }
152        }
153
154        info!(
155            "{} records are about to be consumed on the following topic partitions: [{}]",
156            count.separate_with_underscores(),
157            topic_partition_list
158                .elements()
159                .iter()
160                .map(|e| format!("{}-{}", e.topic(), e.partition()))
161                .join(", ")
162        );
163        Ok(count)
164    }
165
166    fn create_assigned_consumer(&self) -> Result<StreamConsumer, Error> {
167        self.config.create_kafka_consumer()
168    }
169
170    /// Assigns topics to a consumer
171    fn assign_partitions(
172        &self,
173        topics: &Vec<String>,
174        offset: Offset,
175    ) -> Result<StreamConsumer, Error> {
176        let consumer = self.create_assigned_consumer()?;
177        let mut assignments = TopicPartitionList::new();
178        for topic in topics {
179            let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
180            for t in metadata.topics() {
181                for p in t.partitions() {
182                    assignments.add_partition_offset(topic, p.id(), offset)?;
183                }
184            }
185        }
186        consumer.assign(&assignments)?;
187        info!("New Consumer created, about to consume {topics:?}");
188        Ok(consumer)
189    }
190
191    /// Returns the topics details for a given list topics
192    /// This function is not ready yet
193    pub fn topic_details(&self, topics: HashSet<String>) -> Result<Vec<TopicDetail>, Error> {
194        let mut results = vec![];
195        for topic in topics {
196            let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
197            let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?;
198            let metadata = metadata.topics().first().unwrap();
199            let mut detail = TopicDetail {
200                name: topic.clone(),
201                replicas: metadata.partitions().first().unwrap().replicas().len(),
202                partitions: metadata.partitions().len(),
203                consumer_groups: vec![],
204                count: self.count_records_in_topic(&topic)?,
205                config: None,
206            };
207            let mut consumer_groups = vec![];
208            let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?;
209            for g in metadata.groups() {
210                consumer_groups.push(ConsumerGroupDetail {
211                    name: g.name().to_string(),
212                    members: vec![], //Self::parse_members(g, g.members())?,
213                    state: g.state().parse()?,
214                });
215            }
216            detail.consumer_groups = consumer_groups;
217            results.push(detail);
218        }
219
220        Ok(results)
221    }
222
223    pub async fn topic_config_of(&self, topic: &str) -> Result<Option<TopicConfig>, Error> {
224        AdminClient::new(self.config.client_config())?
225            .topic_config(topic)
226            .await
227    }
228
229    pub fn count_records_in_topic(&self, topic: &str) -> Result<i64, Error> {
230        let mut count = 0;
231        let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
232        let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
233        let metadata_topic = metadata.topics().first();
234        if metadata_topic.is_none() {
235            return Ok(0);
236        }
237
238        let metadata_topic = metadata_topic.unwrap();
239        for partition in metadata_topic.partitions() {
240            let watermarks =
241                consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?;
242            count += watermarks.1 - watermarks.0;
243        }
244
245        Ok(count)
246    }
247
248    /// Lists available kafka topics on the cluster.
249    pub fn list_topics(&self) -> Result<Vec<String>, Error> {
250        let consumer: StreamConsumer = self.create_assigned_consumer()?;
251        let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?;
252        let topics = metadata
253            .topics()
254            .iter()
255            .map(|t| t.name().to_string())
256            .collect_vec();
257        Ok(topics)
258    }
259
260    // TODO https://github.com/fede1024/rust-rdkafka/pull/680
261    //    pub fn parse_members(
262    //        group: &GroupInfo,
263    //        members: &[GroupMemberInfo],
264    //    ) -> Result<Vec<ConsumerGroupMember>, anyhow::Error> {
265    //        return Ok(vec![]);
266    //        let members = members
267    //            .iter()
268    //            .map(|member| {
269    //                let mut assigns = Vec::new();
270    //                if group.protocol_type() == "consumer" {
271    //                    if let Some(assignment) = member.assignment() {
272    //                        let mut payload_rdr = Cursor::new(assignment);
273    //                        assigns = Self::parse_member_assignment(&mut payload_rdr)
274    //                            .expect("Parse member assignment failed");
275    //                    }
276    //                }
277    //                ConsumerGroupMember {
278    //                    member: member.id().to_owned(),
279    //                    start_offset: 0,
280    //                    end_offset: 0,
281    //                    assignments: assigns,
282    //                }
283    //            })
284    //            .collect::<Vec<_>>();
285    //
286    //        Ok(members)
287    //    }
288    //
289    //    fn parse_member_assignment(
290    //        payload_rdr: &mut Cursor<&[u8]>,
291    //    ) -> Result<Vec<MemberAssignment>, anyhow::Error> {
292    //        return Ok(vec![]);
293    //        let _version = payload_rdr.read_i16::<BigEndian>()?;
294    //        let assign_len = payload_rdr.read_i32::<BigEndian>()?;
295    //        let mut assigns = Vec::with_capacity(assign_len as usize);
296    //        for _ in 0..assign_len {
297    //            let topic = read_str(payload_rdr)?.to_owned();
298    //            let partition_len = payload_rdr.read_i32::<BigEndian>()?;
299    //            let mut partitions = Vec::with_capacity(partition_len as usize);
300    //            for _ in 0..partition_len {
301    //                let partition = payload_rdr.read_i32::<BigEndian>()?;
302    //                partitions.push(partition);
303    //            }
304    //            assigns.push(MemberAssignment { topic, partitions })
305    //        }
306    //        Ok(assigns)
307    //    }
308
309    /// Lists available topics on the cluster with a custom kafka client.
310    pub fn list_topics_from_client(yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
311        let consumer: StreamConsumer = yozefu_config.create_kafka_consumer()?;
312        let metadata = consumer.fetch_metadata(None, Duration::from_secs(3))?;
313        let topics = metadata
314            .topics()
315            .iter()
316            .map(|t| t.name().to_string())
317            .collect_vec();
318        Ok(topics)
319    }
320}
321
322//fn read_str<'a>(rdr: &'a mut Cursor<&[u8]>) -> Result<&'a str, Error> {
323//    let len = (rdr.read_i16::<BigEndian>())? as usize;
324//    let pos = rdr.position() as usize;
325//    let slice = str::from_utf8(&rdr.get_ref()[pos..(pos + len)])?;
326//    rdr.consume(len);
327//    Ok(slice)
328//}
329//