streambed_kafka/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod args;
4
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::time::Duration;
8
9use async_stream::stream;
10use async_trait::async_trait;
11use log::{debug, trace};
12use metrics::increment_counter;
13use reqwest::Certificate;
14use reqwest::{Client, Url};
15use serde::Deserialize;
16use serde::Serialize;
17use streambed::commit_log::CommitLog;
18use streambed::commit_log::Consumer;
19use streambed::commit_log::ConsumerOffset;
20use streambed::commit_log::ConsumerRecord;
21use streambed::commit_log::PartitionOffsets;
22use streambed::commit_log::ProducedOffset;
23use streambed::commit_log::ProducerError;
24use streambed::commit_log::ProducerRecord;
25use streambed::commit_log::Subscription;
26use streambed::commit_log::Topic;
27use streambed::delayer::Delayer;
28use tokio::time;
29use tokio_stream::Stream;
30
31/// A commit log holds topics and can be appended to and tailed.
32#[derive(Clone)]
33pub struct KafkaRestCommitLog {
34    client: Client,
35    server: Url,
36}
37
38const CONSUMER_GROUP_NAME_LABEL: &str = "consumer_group_name";
39const TOPIC_LABEL: &str = "topic";
40
41#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
42struct ProduceRequest {
43    pub records: Vec<ProducerRecord>,
44}
45
46#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
47struct ProduceReply {
48    pub offsets: Vec<ProducedOffset>,
49}
50
51type OffsetMap = HashMap<(Topic, u32), u64>;
52
53impl KafkaRestCommitLog {
54    /// Establish a new commit log session.
55    pub fn new(server: Url, server_cert: Option<Certificate>, tls_insecure: bool) -> Self {
56        let client = Client::builder().danger_accept_invalid_certs(tls_insecure);
57
58        let client = if let Some(cert) = server_cert {
59            client.add_root_certificate(cert)
60        } else {
61            client
62        };
63
64        Self {
65            client: client.build().unwrap(),
66            server,
67        }
68    }
69}
70
71#[async_trait]
72impl CommitLog for KafkaRestCommitLog {
73    async fn offsets(&self, topic: Topic, partition: u32) -> Option<PartitionOffsets> {
74        let mut delayer = Delayer::default();
75        loop {
76            match self
77                .client
78                .get(
79                    self.server
80                        .join(&format!("/topics/{topic}/partitions/{partition}/offsets"))
81                        .unwrap(),
82                )
83                .send()
84                .await
85            {
86                Ok(response) => {
87                    if response.status().is_success() {
88                        trace!("Retrieved offsets for: {} {}", topic, partition);
89                        increment_counter!("offset_replies",  TOPIC_LABEL => topic.to_string());
90                        break response.json::<PartitionOffsets>().await.ok();
91                    } else {
92                        debug!(
93                            "Commit log failure status while retrieving offsets: {:?}",
94                            response.status()
95                        );
96                        increment_counter!("offsets_other_reply_failures");
97                        break None;
98                    }
99                }
100                Err(e) => {
101                    debug!(
102                        "Commit log is unavailable while retrieving offsets. Error: {:?}",
103                        e
104                    );
105                    increment_counter!("offsets_unavailables");
106                }
107            }
108            delayer.delay().await;
109        }
110    }
111
112    async fn produce(&self, record: ProducerRecord) -> Result<ProducedOffset, ProducerError> {
113        let mut delayer = Delayer::default();
114        loop {
115            match self
116                .client
117                .post(
118                    self.server
119                        .join(&format!("/topics/{}", record.topic))
120                        .unwrap(),
121                )
122                .json(&ProduceRequest {
123                    records: vec![record.clone()],
124                })
125                .send()
126                .await
127            {
128                Ok(response) => {
129                    if response.status().is_success() {
130                        trace!("Produced record: {:?}", record);
131                        increment_counter!("producer_replies",  TOPIC_LABEL => record.topic.to_string());
132                        break response
133                            .json::<ProduceReply>()
134                            .await
135                            .map_err(|_| ProducerError::CannotProduce)
136                            .and_then(|r| {
137                                r.offsets.first().map(|o| o.to_owned()).ok_or_else(|| {
138                                    debug!(
139                                        "Commit log failure reply with no offset while producing"
140                                    );
141                                    increment_counter!("producer_other_reply_failures");
142                                    ProducerError::CannotProduce
143                                })
144                            });
145                    } else {
146                        debug!(
147                            "Commit log failure status while producing: {:?}",
148                            response.status()
149                        );
150                        increment_counter!("producer_other_reply_failures");
151                        break Err(ProducerError::CannotProduce);
152                    }
153                }
154                Err(e) => {
155                    debug!("Commit log is unavailable while producing. Error: {:?}", e);
156                    increment_counter!("producer_unavailables");
157                }
158            }
159            delayer.delay().await;
160        }
161    }
162
163    /// Subscribe to one or more topics for a given consumer group
164    /// having committed zero or more topics. Connections are
165    /// retried if they cannot be established, or become lost.
166    /// Once a connection is established then records are streamed
167    /// back indefinitely unless an idle timeout argument is provided.
168    /// In the case of an idle timeout, if no record is received
169    /// within that period, None is returned to end the stream.
170    fn scoped_subscribe<'a>(
171        &'a self,
172        consumer_group_name: &str,
173        offsets: Vec<ConsumerOffset>,
174        subscriptions: Vec<Subscription>,
175        idle_timeout: Option<Duration>,
176    ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
177        let consumer_group_name = consumer_group_name.to_string();
178        let mut offsets: OffsetMap = offsets
179            .iter()
180            .map(|e| ((e.topic.to_owned(), e.partition), e.offset))
181            .collect::<OffsetMap>();
182        let subscriptions: Vec<Subscription> = subscriptions.iter().map(|e| e.to_owned()).collect();
183        Box::pin(stream!({
184            let mut delayer = Delayer::default();
185            'stream_loop: loop {
186                increment_counter!("consumer_group_requests", CONSUMER_GROUP_NAME_LABEL => consumer_group_name.to_string());
187                let consumer = Consumer {
188                    offsets: offsets
189                        .clone()
190                        .iter()
191                        .map(|((topic, partition), offset)| ConsumerOffset {
192                            offset: *offset,
193                            partition: *partition,
194                            topic: topic.clone(),
195                        })
196                        .collect(),
197                    subscriptions: subscriptions.clone(),
198                };
199                let response = self
200                    .client
201                    .post(
202                        self.server
203                            .join(&format!("/consumers/{}", consumer_group_name))
204                            .unwrap(),
205                    )
206                    .json(&consumer)
207                    .send()
208                    .await;
209                match response {
210                    Ok(mut r) => loop {
211                        let chunk = if let Some(it) = idle_timeout {
212                            match time::timeout(it, r.chunk()).await {
213                                Ok(c) => c,
214                                Err(_) => break 'stream_loop,
215                            }
216                        } else {
217                            r.chunk().await
218                        };
219                        match chunk {
220                            Ok(Some(c)) => {
221                                if let Ok(record) = serde_json::from_slice::<ConsumerRecord>(&c) {
222                                    trace!("Received record: {:?}", record);
223                                    let topic = record.topic.to_owned();
224                                    let partition = record.partition;
225                                    let record_offset = record.offset;
226                                    increment_counter!("consumer_group_replies", CONSUMER_GROUP_NAME_LABEL => consumer_group_name.to_string(), TOPIC_LABEL => topic.to_string());
227                                    yield record;
228
229                                    let _ = offsets.insert((topic, partition), record_offset);
230                                } else {
231                                    debug!("Unable to decode record");
232                                }
233                            }
234                            Ok(None) => {
235                                debug!("Unable to receive chunk");
236                                delayer = Delayer::default();
237                                continue 'stream_loop;
238                            }
239                            Err(e) => debug!("Error receiving chunk {:?}", e),
240                        }
241                    },
242                    Err(e) => {
243                        debug!(
244                            "Commit log is unavailable while subscribing. Error: {:?}",
245                            e
246                        );
247                        increment_counter!("consumer_group_request_failures", CONSUMER_GROUP_NAME_LABEL => consumer_group_name.to_string());
248                    }
249                }
250                delayer.delay().await;
251            }
252        }))
253    }
254}