1
2use std::{collections::HashMap, str::from_utf8, vec};
3use log::{debug, info, warn};
4use serde::{Serialize, Deserialize};
5
6const DEBUG: bool = true;
7
8use simplehttp::simplehttp::SimpleHttpClient;
9
10use crate::{base64_option, RedPandaError};
11
12#[derive(Serialize)]
13pub enum AutoOffsetReset {
14 #[serde(rename="earliest")]
15 Earliest,
16 #[serde(rename="latest")]
17 Latest,
18}
19
20#[derive(Serialize)]
21pub struct Consumer<'a> {
22 #[serde(skip_serializing_if = "Option::is_none")]
23 name: Option<&'a str>,
24 format: &'a str,
25 #[serde(rename="auto.commit.enable")]
26 auto_commit_enable: &'a str,
27 #[serde(rename="fetch.min.bytes")]
28 fetch_min_bytes: &'a str,
29 #[serde(rename="consumer.request.timeout.ms")]
30 consumer_request_timeout_ms: &'a str,
31 #[serde(rename="auto.offset.reset")]
32 auto_offset_reset: AutoOffsetReset,
33}
34
35impl<'a> Consumer<'a> {
36 fn create(name: Option<&'a str>)->Consumer<'a> {
37 Consumer { name, format: "json", auto_offset_reset: AutoOffsetReset::Earliest, auto_commit_enable: "false", fetch_min_bytes: "1", consumer_request_timeout_ms: "10000" }
38 }
39}
40
41#[derive(Serialize,Deserialize)]
42struct ReceivedOffset {
43 topic: String,
44 partition: u16,
45 offset: i64,
46}
47#[derive(Serialize,Deserialize)]
48struct ReceivedOffsets {
49 offsets: Vec<ReceivedOffset>,
50}
51
52#[derive(Serialize,Deserialize)]
53struct PartitionDefinition {
54 topic: String,
55 partition: u16,
56}
57#[derive(Serialize,Deserialize)]
58struct PartitionDefinitions {
59 partitions: Vec<PartitionDefinition>,
60}
61
62
63#[derive(Serialize,Debug)]
64struct CommitState {
65 topics: HashMap<String,HashMap<u16,i64>>
66}
67#[derive(Serialize,Debug)]
68pub struct CommitPartition {
69 topic: String,
70 partition: u16,
71 offset: i64
72}
73
74impl CommitState {
75 fn create()->CommitState {
76 CommitState { topics: HashMap::new() }
77 }
78
79 fn read_offset(&mut self, topic: String, partition: u16, offset: i64) {
80 let topic = self.topics.entry(topic).or_insert(HashMap::new());
81 topic.insert(partition,offset);
82 }
83
84 fn process_record(&mut self, record: &Record) {
85 let topic = self.topics.get_mut(&record.topic);
86 match topic {
87 None => {
88 let mut new_topic: HashMap::<u16,i64> = HashMap::new();
89 new_topic.insert(record.partition, record.offset + 1);
90 self.topics.insert(record.topic.to_owned(), new_topic);
91 }
92 Some(map) => {
93 map.insert(record.partition, record.offset + 1);
94 }
95 }
96 }
97
98 fn partition_list(&self)->Vec<CommitPartition> {
100 let mut result: Vec<CommitPartition> = vec![];
101 for (topic,partitions) in &self.topics {
102 for (partition,offset) in partitions.iter() {
103 result.push(CommitPartition {topic : topic.clone(), partition: *partition, offset: *offset});
104 }
105 }
106 result
107 }
108
109 fn clear(&mut self) {
110 self.topics.clear()
111 }
112}
113#[derive(Deserialize,Serialize)]
114#[derive(Clone,Debug)]
115struct ConsumerResponse {
116 instance_id: String,
117 base_uri: String
118}
119#[derive(Serialize)]
120struct SubscribeRequest<'a> {
121 topics: Vec<&'a str>
122}
123
124#[derive(Deserialize,Serialize)]
125pub struct Record {
126 pub topic: String,
127 #[serde(with="base64_option")]
128 pub key: Option<Vec<u8>>,
129 #[serde(with="base64_option")]
130 pub value: Option<Vec<u8>>,
131 pub partition: u16,
132 pub offset: i64
133}
134
135pub struct Subscriber {
136 inital_url: String,
137 group: String,
138 consumer_response: Option<ConsumerResponse>,
139 client: Box<dyn SimpleHttpClient>,
140 commit_state: CommitState,
141}
142
143impl From<(&str,u16)> for PartitionDefinition {
144 fn from((topic,partition): (&str,u16)) -> Self {
145 Self { topic: topic.to_owned(), partition}
146 }
147}
148
149impl From<Vec<(&str,u16)>> for PartitionDefinitions {
150 fn from(value: Vec<(&str,u16)>) -> Self {
151 PartitionDefinitions { partitions: value.into_iter().map(|e| e.into()).collect() }
152 }
153}
154
155impl Subscriber {
156 pub fn new(http_client: Box<dyn SimpleHttpClient>, inital_url: &str, group: &str, name: Option<&str>)->Result<Self,RedPandaError> {
157 let mut client = Self {inital_url: inital_url.to_owned(), group: group.to_owned(), consumer_response: Option::None, client: http_client, commit_state: CommitState::create()};
158 let consumer = Consumer::create(name);
159 let body = serde_json::to_vec(&consumer)
160 .map_err(|e| RedPandaError::nested("Error serializing JSON request",Box::new(e)))?
161 ;
162 let url = format!("{}consumers/{}", client.inital_url, client.group);
163 let headers = vec![("Content-Type","application/vnd.kafka.v2+json")];
164 if DEBUG {
165 debug!("Initializing using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&consumer).unwrap());
166 debug!("Headers: {:?}",headers);
167 }
168 let result = client.client.post(&url, &headers, &body)
169 .map_err(|e| RedPandaError::nested("error creating consumer",Box::new(e)))?;
170 if DEBUG {
171 debug!("Result text:\n{}", from_utf8(result.as_slice())
172 .map_err(|e| RedPandaError::nested("Issues creating utf8",Box::new(e)))?);
173 }
174
175 client.consumer_response = Some(serde_json::from_slice(&result)
176 .map_err(|e| RedPandaError::nested("Error parsing JSON Red Panda reply",Box::new(e)))?);
177 debug!("Create consumer:\n{:?}",client.consumer_response);
178 Ok(client)
180 }
181
182 pub fn register_topic(&mut self, topics: Vec<(&str,u16)>) -> Result<(), RedPandaError> {
183 let tlist: Vec<&str> = topics.iter()
184 .map(|(topic,_)|(*topic).clone())
185 .collect();
186 let subscr = SubscribeRequest{topics: tlist};
187 let url = format!("{}/subscription",self.consumer_response.as_ref().unwrap().base_uri);
188 let body = serde_json::to_vec(&subscr)
189 .map_err(|e| RedPandaError::nested("Error serializing subscription request",Box::new(e)))?;
190 if DEBUG {
191 debug!("Registering topic using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&subscr).unwrap())
192 }
193 let _ = self.client.post(&url, &[("Content-Type","application/vnd.kafka.v2+json")], &body)
194 .map_err(|e| RedPandaError::nested(&format!("error registering topic to: {:?}",url),Box::new(e)))?;
195
196 let l = topics.iter()
197 .flat_map(|(topic,partition_count)| (0..*partition_count).into_iter().map(|partition_number| (*topic,partition_number)) )
198 .collect();
199 self.create_commit_state(l)?;
200 self.commit_state()?;
202 Ok(())
203 }
204
205 pub fn process_record(&mut self, record: &Record) {
206 self.commit_state.process_record(record);
207
208 debug!("Current commit state:\n{:?}",self.commit_state);
209 }
210 pub fn poll(&mut self, bytecount: i32) -> Result<Vec<Record>,RedPandaError> {
216 let url = format!("{}/records?timeout=10000&&max_bytes={}",self.consumer_response.as_ref().unwrap().base_uri,bytecount);
217 if DEBUG {
218 debug!("Calling get from url: {}",url);
219 }
220 let records = self.client.get(&url,&[("Accept", "application/vnd.kafka.binary.v2+json")])
223 .map_err(|e| RedPandaError::nested("error polling",Box::new(e)))?;
224 if DEBUG {
225 let text = String::from_utf8(records.clone()).unwrap();
226 warn!("Result poll body: {}",text);
227 }
228 let parsed:Vec<Record> = serde_json::from_slice(&records)
229 .map_err(|e|{RedPandaError::nested(&format!("Error parsing polling response. Response:\n{}",from_utf8(&records).unwrap_or("error")),Box::new(e))})?;
230 if DEBUG {
231 debug!("Polled from url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&parsed).unwrap());
232 }
233 Ok(parsed)
234 }
235
236 fn create_commit_state(&mut self, offsets: Vec<(&str,u16)>) -> Result<(), RedPandaError> {
237 let base_uri = self.consumer_response.as_ref().unwrap().base_uri.as_str();
239 debug!("BaseUrl: {}",base_uri);
241 let url = format!("{}/offsets",base_uri);
243 let headers = [("Accept", "application/vnd.kafka.v2+json"),("Content-Type", "application/vnd.kafka.v2+json")];
244
245 let initial_offsets: PartitionDefinitions = offsets.into();
246 let body = serde_json::to_vec_pretty(&initial_offsets).map_err(|e| RedPandaError::nested("Error serializing initial offsets",Box::new(e)))?;
247 warn!("Get offsets Url: {}",url);
248 warn!("Get offsets Body: {}",from_utf8(&body).unwrap());
249 let res = self.client.get_with_body(&url, &headers, &body)
250 .map_err(|e| RedPandaError::nested("error reading state",Box::new(e)))?;
251 if DEBUG {
252 warn!("Get offsets Result:\n{}",from_utf8(&res).unwrap())
253 }
254
255
256 let received_offsets: ReceivedOffsets = serde_json::from_slice(&res)
257 .map_err(|e| RedPandaError::nested("error reading state", Box::new(e)))?;
258 let mut commit_state = CommitState::create();
259 received_offsets.offsets.into_iter().for_each(|element| {
260 commit_state.read_offset(element.topic, element.partition, element.offset.max(0))
262
263 });
264 self.commit_state = commit_state;
265 Ok(())
266 }
270
271 pub fn commit_state(&mut self) -> Result<(), RedPandaError> {
272 let partitions = self.commit_state.partition_list();
273 info!("Committing topics: {:?}",&partitions);
274 let commits = HashMap::from([("partitions".to_owned(),partitions)]);
275 let body = serde_json::to_vec(&commits ).map_err(|e| RedPandaError::nested("Error serializing commit state",Box::new(e)))?;
276 let url = format!("{}/offsets",self.consumer_response.as_ref().unwrap().base_uri);
278 if DEBUG {
279 warn!("Committing to url:{}\nBody:\n{}",url,serde_json::to_string_pretty(&commits).unwrap());
280 }
281 let _result = self.client.post(&url, &[("Accept","application/vnd.kafka.v2+json"),("Content-Type","application/vnd.kafka.v2+json")], &body)
282 .map_err(|e| RedPandaError::nested("error commiting state",Box::new(e)))?
283 ;
284 self.commit_state.clear();
286 Ok(())
287 }
288
289 pub fn instance_id(&self) -> Option<String> {
290 Some(self.consumer_response.as_ref().unwrap().instance_id.clone())
291 }
292}
293
294
295
296#[cfg(test)]
297#[cfg(feature = "reqwest")]
298mod tests {
299
300 #[test]
301 fn test_reqwest() {
302
303 }
304}