yozefu_app/
app.rs

1//! This app is both a kafka consumer and a kafka admin client.
2use lib::{
3    ConsumerGroupDetail, Error, ExportedKafkaRecord, KafkaRecord, TopicDetail,
4    kafka::SchemaRegistryClient, search::offset::FromOffset,
5};
6use log::{info, warn};
7use rdkafka::{
8    Offset, TopicPartitionList,
9    consumer::{BaseConsumer, Consumer, StreamConsumer},
10};
11use thousands::Separable;
12
13use std::{collections::HashSet, fs, time::Duration};
14
15use itertools::Itertools;
16
17use crate::{
18    configuration::{Configuration, InternalConfig, YozefuConfig},
19    search::{Search, ValidSearchQuery},
20};
21
22/// Struct exposing different functions for consuming kafka records.
23#[derive(Debug, Clone)]
24pub struct App {
25    pub cluster: String,
26    pub config: InternalConfig,
27    pub search_query: ValidSearchQuery,
28    //pub output_file: PathBuf,
29}
30
31impl App {
32    pub fn new(
33        cluster: String,
34        config: InternalConfig,
35        search_query: ValidSearchQuery,
36        //     output_file: PathBuf,
37    ) -> Self {
38        Self {
39            cluster,
40            config,
41            search_query,
42            //    output_file,
43        }
44    }
45
46    pub fn schema_registry(&self) -> Option<SchemaRegistryClient> {
47        match self.config.schema_registry_config_of(&self.cluster) {
48            Some(config) => Some(SchemaRegistryClient::new(config.url, &config.headers)),
49            None => None,
50        }
51    }
52
53    /// Create a kafka consumer
54    pub fn create_consumer(&self, topics: &Vec<String>) -> Result<StreamConsumer, Error> {
55        let offset = self.search_query.offset().unwrap_or(FromOffset::End);
56        match offset {
57            FromOffset::Beginning => self.assign_partitions(topics, Offset::Beginning),
58            FromOffset::End => self.assign_partitions(topics, Offset::End),
59            FromOffset::Offset(o) => self.assign_partitions(topics, Offset::Offset(o)),
60            FromOffset::OffsetTail(o) => self.assign_partitions(topics, Offset::OffsetTail(o)),
61            FromOffset::Timestamp(timestamp) => {
62                let consumer: StreamConsumer = self.config.create_kafka_consumer()?;
63                let mut tp = TopicPartitionList::new();
64                for t in topics {
65                    let metadata = consumer.fetch_metadata(Some(t), Duration::from_secs(10))?;
66                    for m in metadata.topics() {
67                        for p in m.partitions() {
68                            tp.add_partition(m.name(), p.id());
69                        }
70                    }
71                }
72                tp.set_all_offsets(Offset::Offset(timestamp))?;
73                let tt = consumer.offsets_for_times(tp, Duration::from_secs(60))?;
74                consumer.assign(&tt)?;
75                Ok(consumer)
76            }
77        }
78    }
79
80    /// Exports a given kafka record to a file.
81    /// The Name of the file is automatically generated at the runtime
82    pub fn export_record(&self, record: &KafkaRecord) -> Result<(), Error> {
83        let output_file = self.config.output_file();
84        fs::create_dir_all(output_file.parent().unwrap())?;
85        let content = fs::read_to_string(output_file).unwrap_or("[]".to_string());
86        let mut exported_records: Vec<ExportedKafkaRecord> = serde_json::from_str(&content)?;
87
88        let mut exported_record_kafka: ExportedKafkaRecord = record.into();
89        exported_record_kafka.set_search_query(self.search_query.query());
90        exported_records.push(exported_record_kafka);
91        exported_records.sort_by(|a, b| {
92            a.record
93                .timestamp
94                .cmp(&b.record.timestamp)
95                .then(a.record.offset.cmp(&b.record.offset))
96        });
97        exported_records.dedup();
98        for i in 1..exported_records.len() {
99            let first_ts = exported_records.first().unwrap().record.timestamp;
100            let previous_ts = exported_records.get(i - 1).unwrap().record.timestamp;
101            let current = exported_records.get_mut(i).unwrap();
102            current.compute_deltas_ms(first_ts, previous_ts);
103        }
104
105        fs::write(
106            output_file,
107            serde_json::to_string_pretty(&exported_records)?,
108        )?;
109        info!(
110            "A record has been exported into file '{}'",
111            output_file.display()
112        );
113        Ok(())
114    }
115
116    /// Calculates an estimate of the number of records that are going to be read.
117    /// This function is used to render a progress bar.
118    pub fn estimate_number_of_records_to_read(
119        &self,
120        topic_partition_list: TopicPartitionList,
121    ) -> Result<i64, Error> {
122        let client: StreamConsumer = self.create_assigned_consumer()?;
123        let mut count = 0;
124        for t in topic_partition_list.elements() {
125            // this function call be very slow
126            let watermarks: (i64, i64) =
127                match client.fetch_watermarks(t.topic(), t.partition(), Duration::from_secs(10)) {
128                    Ok(i) => i,
129                    Err(e) => {
130                        warn!(
131                            "I was not able to fetch watermarks of topic '{}', partition {}: {}",
132                            t.partition(),
133                            t.topic(),
134                            e
135                        );
136                        (0, 0)
137                    }
138                };
139            count += match t.offset() {
140                Offset::Beginning => watermarks.1 - watermarks.0,
141                Offset::End => 0,
142                Offset::Stored => 1,
143                Offset::Invalid => 1,
144                Offset::Offset(o) => watermarks.1 - o,
145                Offset::OffsetTail(o) => o,
146            }
147        }
148
149        info!(
150            "{} records are about to be consumed on the following topic partitions: [{}]",
151            count.separate_with_underscores(),
152            topic_partition_list
153                .elements()
154                .iter()
155                .map(|e| format!("{}-{}", e.topic(), e.partition()))
156                .join(", ")
157        );
158        Ok(count)
159    }
160
161    fn create_assigned_consumer(&self) -> Result<StreamConsumer, Error> {
162        self.config.create_kafka_consumer()
163    }
164
165    /// Assigns topics to a consumer
166    fn assign_partitions(
167        &self,
168        topics: &Vec<String>,
169        offset: Offset,
170    ) -> Result<StreamConsumer, Error> {
171        let consumer = self.create_assigned_consumer()?;
172        let mut assignments = TopicPartitionList::new();
173        for topic in topics {
174            let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
175            for t in metadata.topics() {
176                for p in t.partitions() {
177                    assignments.add_partition_offset(topic, p.id(), offset)?;
178                }
179            }
180        }
181        consumer.assign(&assignments)?;
182        info!("New Consumer created, about to consume {:?}", topics);
183        Ok(consumer)
184    }
185
186    /// Returns the topics details for a given list topics
187    /// This function is not ready yet
188    pub fn topic_details(&self, topics: HashSet<String>) -> Result<Vec<TopicDetail>, Error> {
189        let mut results = vec![];
190        for topic in topics {
191            let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
192            let metadata = consumer.fetch_metadata(Some(&topic), Duration::from_secs(10))?;
193            let metadata = metadata.topics().first().unwrap();
194            let mut detail = TopicDetail {
195                name: topic.clone(),
196                replicas: metadata.partitions().first().unwrap().replicas().len(),
197                partitions: metadata.partitions().len(),
198                consumer_groups: vec![],
199                count: self.count_records_in_topic(&topic)?,
200            };
201            let mut consumer_groups = vec![];
202            let metadata = consumer.fetch_group_list(None, Duration::from_secs(10))?;
203            for g in metadata.groups() {
204                consumer_groups.push(ConsumerGroupDetail {
205                    name: g.name().to_string(),
206                    members: vec![], //Self::parse_members(g, g.members())?,
207                    state: g.state().parse()?,
208                });
209            }
210
211            detail.consumer_groups = consumer_groups;
212            results.push(detail);
213        }
214
215        Ok(results)
216    }
217
218    pub fn count_records_in_topic(&self, topic: &str) -> Result<i64, Error> {
219        let mut count = 0;
220        let consumer: BaseConsumer = self.config.create_kafka_consumer()?;
221        let metadata = consumer.fetch_metadata(Some(topic), Duration::from_secs(10))?;
222        let metadata_topic = metadata.topics().first();
223        if metadata_topic.is_none() {
224            return Ok(0);
225        }
226
227        let metadata_topic = metadata_topic.unwrap();
228        for partition in metadata_topic.partitions() {
229            let watermarks =
230                consumer.fetch_watermarks(topic, partition.id(), Duration::from_secs(10))?;
231            count += watermarks.1 - watermarks.0;
232        }
233
234        Ok(count)
235    }
236
237    /// Lists available kafka topics on the cluster.
238    pub fn list_topics(&self) -> Result<Vec<String>, Error> {
239        let consumer: StreamConsumer = self.create_assigned_consumer()?;
240        let metadata = consumer.fetch_metadata(None, Duration::from_secs(10))?;
241        let topics = metadata
242            .topics()
243            .iter()
244            .map(|t| t.name().to_string())
245            .collect_vec();
246        Ok(topics)
247    }
248
249    // TODO https://github.com/fede1024/rust-rdkafka/pull/680
250    //    pub fn parse_members(
251    //        group: &GroupInfo,
252    //        members: &[GroupMemberInfo],
253    //    ) -> Result<Vec<ConsumerGroupMember>, anyhow::Error> {
254    //        return Ok(vec![]);
255    //        let members = members
256    //            .iter()
257    //            .map(|member| {
258    //                let mut assigns = Vec::new();
259    //                if group.protocol_type() == "consumer" {
260    //                    if let Some(assignment) = member.assignment() {
261    //                        let mut payload_rdr = Cursor::new(assignment);
262    //                        assigns = Self::parse_member_assignment(&mut payload_rdr)
263    //                            .expect("Parse member assignment failed");
264    //                    }
265    //                }
266    //                ConsumerGroupMember {
267    //                    member: member.id().to_owned(),
268    //                    start_offset: 0,
269    //                    end_offset: 0,
270    //                    assignments: assigns,
271    //                }
272    //            })
273    //            .collect::<Vec<_>>();
274    //
275    //        Ok(members)
276    //    }
277    //
278    //    fn parse_member_assignment(
279    //        payload_rdr: &mut Cursor<&[u8]>,
280    //    ) -> Result<Vec<MemberAssignment>, anyhow::Error> {
281    //        return Ok(vec![]);
282    //        let _version = payload_rdr.read_i16::<BigEndian>()?;
283    //        let assign_len = payload_rdr.read_i32::<BigEndian>()?;
284    //        let mut assigns = Vec::with_capacity(assign_len as usize);
285    //        for _ in 0..assign_len {
286    //            let topic = read_str(payload_rdr)?.to_owned();
287    //            let partition_len = payload_rdr.read_i32::<BigEndian>()?;
288    //            let mut partitions = Vec::with_capacity(partition_len as usize);
289    //            for _ in 0..partition_len {
290    //                let partition = payload_rdr.read_i32::<BigEndian>()?;
291    //                partitions.push(partition);
292    //            }
293    //            assigns.push(MemberAssignment { topic, partitions })
294    //        }
295    //        Ok(assigns)
296    //    }
297
298    /// Lists available topics on the cluster with a custom kafka client.
299    pub fn list_topics_from_client(yozefu_config: &YozefuConfig) -> Result<Vec<String>, Error> {
300        let consumer: StreamConsumer = yozefu_config.create_kafka_consumer()?;
301        let metadata = consumer.fetch_metadata(None, Duration::from_secs(3))?;
302        let topics = metadata
303            .topics()
304            .iter()
305            .map(|t| t.name().to_string())
306            .collect_vec();
307        Ok(topics)
308    }
309}
310
311//fn read_str<'a>(rdr: &'a mut Cursor<&[u8]>) -> Result<&'a str, Error> {
312//    let len = (rdr.read_i16::<BigEndian>())? as usize;
313//    let pos = rdr.position() as usize;
314//    let slice = str::from_utf8(&rdr.get_ref()[pos..(pos + len)])?;
315//    rdr.consume(len);
316//    Ok(slice)
317//}
318//