1#![doc = include_str!("../README.md")]
2
3pub mod args;
4
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::time::Duration;
8
9use async_stream::stream;
10use async_trait::async_trait;
11use log::{debug, trace};
12use metrics::increment_counter;
13use reqwest::Certificate;
14use reqwest::{Client, Url};
15use serde::Deserialize;
16use serde::Serialize;
17use streambed::commit_log::CommitLog;
18use streambed::commit_log::Consumer;
19use streambed::commit_log::ConsumerOffset;
20use streambed::commit_log::ConsumerRecord;
21use streambed::commit_log::PartitionOffsets;
22use streambed::commit_log::ProducedOffset;
23use streambed::commit_log::ProducerError;
24use streambed::commit_log::ProducerRecord;
25use streambed::commit_log::Subscription;
26use streambed::commit_log::Topic;
27use streambed::delayer::Delayer;
28use tokio::time;
29use tokio_stream::Stream;
30
31#[derive(Clone)]
33pub struct KafkaRestCommitLog {
34 client: Client,
35 server: Url,
36}
37
38const CONSUMER_GROUP_NAME_LABEL: &str = "consumer_group_name";
39const TOPIC_LABEL: &str = "topic";
40
41#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
42struct ProduceRequest {
43 pub records: Vec<ProducerRecord>,
44}
45
46#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
47struct ProduceReply {
48 pub offsets: Vec<ProducedOffset>,
49}
50
51type OffsetMap = HashMap<(Topic, u32), u64>;
52
53impl KafkaRestCommitLog {
54 pub fn new(server: Url, server_cert: Option<Certificate>, tls_insecure: bool) -> Self {
56 let client = Client::builder().danger_accept_invalid_certs(tls_insecure);
57
58 let client = if let Some(cert) = server_cert {
59 client.add_root_certificate(cert)
60 } else {
61 client
62 };
63
64 Self {
65 client: client.build().unwrap(),
66 server,
67 }
68 }
69}
70
71#[async_trait]
72impl CommitLog for KafkaRestCommitLog {
73 async fn offsets(&self, topic: Topic, partition: u32) -> Option<PartitionOffsets> {
74 let mut delayer = Delayer::default();
75 loop {
76 match self
77 .client
78 .get(
79 self.server
80 .join(&format!("/topics/{topic}/partitions/{partition}/offsets"))
81 .unwrap(),
82 )
83 .send()
84 .await
85 {
86 Ok(response) => {
87 if response.status().is_success() {
88 trace!("Retrieved offsets for: {} {}", topic, partition);
89 increment_counter!("offset_replies", TOPIC_LABEL => topic.to_string());
90 break response.json::<PartitionOffsets>().await.ok();
91 } else {
92 debug!(
93 "Commit log failure status while retrieving offsets: {:?}",
94 response.status()
95 );
96 increment_counter!("offsets_other_reply_failures");
97 break None;
98 }
99 }
100 Err(e) => {
101 debug!(
102 "Commit log is unavailable while retrieving offsets. Error: {:?}",
103 e
104 );
105 increment_counter!("offsets_unavailables");
106 }
107 }
108 delayer.delay().await;
109 }
110 }
111
112 async fn produce(&self, record: ProducerRecord) -> Result<ProducedOffset, ProducerError> {
113 let mut delayer = Delayer::default();
114 loop {
115 match self
116 .client
117 .post(
118 self.server
119 .join(&format!("/topics/{}", record.topic))
120 .unwrap(),
121 )
122 .json(&ProduceRequest {
123 records: vec![record.clone()],
124 })
125 .send()
126 .await
127 {
128 Ok(response) => {
129 if response.status().is_success() {
130 trace!("Produced record: {:?}", record);
131 increment_counter!("producer_replies", TOPIC_LABEL => record.topic.to_string());
132 break response
133 .json::<ProduceReply>()
134 .await
135 .map_err(|_| ProducerError::CannotProduce)
136 .and_then(|r| {
137 r.offsets.first().map(|o| o.to_owned()).ok_or_else(|| {
138 debug!(
139 "Commit log failure reply with no offset while producing"
140 );
141 increment_counter!("producer_other_reply_failures");
142 ProducerError::CannotProduce
143 })
144 });
145 } else {
146 debug!(
147 "Commit log failure status while producing: {:?}",
148 response.status()
149 );
150 increment_counter!("producer_other_reply_failures");
151 break Err(ProducerError::CannotProduce);
152 }
153 }
154 Err(e) => {
155 debug!("Commit log is unavailable while producing. Error: {:?}", e);
156 increment_counter!("producer_unavailables");
157 }
158 }
159 delayer.delay().await;
160 }
161 }
162
163 fn scoped_subscribe<'a>(
171 &'a self,
172 consumer_group_name: &str,
173 offsets: Vec<ConsumerOffset>,
174 subscriptions: Vec<Subscription>,
175 idle_timeout: Option<Duration>,
176 ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
177 let consumer_group_name = consumer_group_name.to_string();
178 let mut offsets: OffsetMap = offsets
179 .iter()
180 .map(|e| ((e.topic.to_owned(), e.partition), e.offset))
181 .collect::<OffsetMap>();
182 let subscriptions: Vec<Subscription> = subscriptions.iter().map(|e| e.to_owned()).collect();
183 Box::pin(stream!({
184 let mut delayer = Delayer::default();
185 'stream_loop: loop {
186 increment_counter!("consumer_group_requests", CONSUMER_GROUP_NAME_LABEL => consumer_group_name.to_string());
187 let consumer = Consumer {
188 offsets: offsets
189 .clone()
190 .iter()
191 .map(|((topic, partition), offset)| ConsumerOffset {
192 offset: *offset,
193 partition: *partition,
194 topic: topic.clone(),
195 })
196 .collect(),
197 subscriptions: subscriptions.clone(),
198 };
199 let response = self
200 .client
201 .post(
202 self.server
203 .join(&format!("/consumers/{}", consumer_group_name))
204 .unwrap(),
205 )
206 .json(&consumer)
207 .send()
208 .await;
209 match response {
210 Ok(mut r) => loop {
211 let chunk = if let Some(it) = idle_timeout {
212 match time::timeout(it, r.chunk()).await {
213 Ok(c) => c,
214 Err(_) => break 'stream_loop,
215 }
216 } else {
217 r.chunk().await
218 };
219 match chunk {
220 Ok(Some(c)) => {
221 if let Ok(record) = serde_json::from_slice::<ConsumerRecord>(&c) {
222 trace!("Received record: {:?}", record);
223 let topic = record.topic.to_owned();
224 let partition = record.partition;
225 let record_offset = record.offset;
226 increment_counter!("consumer_group_replies", CONSUMER_GROUP_NAME_LABEL => consumer_group_name.to_string(), TOPIC_LABEL => topic.to_string());
227 yield record;
228
229 let _ = offsets.insert((topic, partition), record_offset);
230 } else {
231 debug!("Unable to decode record");
232 }
233 }
234 Ok(None) => {
235 debug!("Unable to receive chunk");
236 delayer = Delayer::default();
237 continue 'stream_loop;
238 }
239 Err(e) => debug!("Error receiving chunk {:?}", e),
240 }
241 },
242 Err(e) => {
243 debug!(
244 "Commit log is unavailable while subscribing. Error: {:?}",
245 e
246 );
247 increment_counter!("consumer_group_request_failures", CONSUMER_GROUP_NAME_LABEL => consumer_group_name.to_string());
248 }
249 }
250 delayer.delay().await;
251 }
252 }))
253 }
254}