redpanda_http/
subscriber.rs

1
2use std::{collections::HashMap, str::from_utf8, vec};
3use log::{debug, info, warn};
4use serde::{Serialize, Deserialize};
5
6const DEBUG: bool = true;
7
8use simplehttp::simplehttp::SimpleHttpClient;
9
10use crate::{base64_option, RedPandaError};
11
12#[derive(Serialize)]
13pub enum AutoOffsetReset {
14    #[serde(rename="earliest")]
15    Earliest,
16    #[serde(rename="latest")]
17    Latest,
18}
19
20#[derive(Serialize)]
21pub struct Consumer<'a> {
22    #[serde(skip_serializing_if = "Option::is_none")]
23    name: Option<&'a str>,
24    format: &'a str,
25    #[serde(rename="auto.commit.enable")]
26    auto_commit_enable: &'a str,
27    #[serde(rename="fetch.min.bytes")]
28    fetch_min_bytes: &'a str,
29    #[serde(rename="consumer.request.timeout.ms")]
30    consumer_request_timeout_ms: &'a str,
31    #[serde(rename="auto.offset.reset")]
32    auto_offset_reset: AutoOffsetReset,
33}
34
35impl<'a> Consumer<'a> {
36    fn create(name: Option<&'a str>)->Consumer<'a> {
37        Consumer { name, format: "json", auto_offset_reset: AutoOffsetReset::Earliest, auto_commit_enable: "false", fetch_min_bytes: "1", consumer_request_timeout_ms: "10000" }
38    }
39}
40
41#[derive(Serialize,Deserialize)]
42struct ReceivedOffset {
43    topic: String,
44    partition: u16,
45    offset: i64,
46}
47#[derive(Serialize,Deserialize)]
48struct ReceivedOffsets {
49    offsets: Vec<ReceivedOffset>,
50}
51
52#[derive(Serialize,Deserialize)]
53struct PartitionDefinition {
54    topic: String,
55    partition: u16,
56}
57#[derive(Serialize,Deserialize)]
58struct PartitionDefinitions {
59    partitions: Vec<PartitionDefinition>,
60}
61
62
63#[derive(Serialize,Debug)]
64struct CommitState {
65    topics: HashMap<String,HashMap<u16,i64>>
66}
67#[derive(Serialize,Debug)]
68pub struct CommitPartition {
69    topic: String,
70    partition: u16,
71    offset: i64
72}
73
74impl CommitState {
75    fn create()->CommitState {
76        CommitState { topics: HashMap::new() }
77    }
78
79    fn read_offset(&mut self, topic: String, partition: u16, offset: i64) {
80        let topic = self.topics.entry(topic).or_insert(HashMap::new());
81        topic.insert(partition,offset);
82    }
83
84    fn process_record(&mut self, record: &Record) {
85        let topic = self.topics.get_mut(&record.topic);
86        match topic {
87            None => {
88                let mut new_topic: HashMap::<u16,i64> = HashMap::new();
89                new_topic.insert(record.partition, record.offset + 1);
90                self.topics.insert(record.topic.to_owned(), new_topic);
91            }
92            Some(map) => {
93                map.insert(record.partition, record.offset + 1);
94            }
95        }
96    }
97
98    // TODO rewrite as flat_map
99    fn partition_list(&self)->Vec<CommitPartition> {
100        let mut result: Vec<CommitPartition> = vec![];
101        for (topic,partitions) in &self.topics {
102            for (partition,offset) in partitions.iter() {
103                result.push(CommitPartition {topic : topic.clone(), partition: *partition, offset: *offset});
104            }
105        }
106        result
107    }
108
109    fn clear(&mut self) {
110        self.topics.clear()
111    }
112}
113#[derive(Deserialize,Serialize)]
114#[derive(Clone,Debug)]
115struct ConsumerResponse {
116    instance_id: String,
117    base_uri: String
118}
119#[derive(Serialize)]
120struct SubscribeRequest<'a> {
121    topics: Vec<&'a str>
122}
123
124#[derive(Deserialize,Serialize)]
125pub struct Record {
126    pub topic: String,
127    #[serde(with="base64_option")]
128    pub key: Option<Vec<u8>>,
129    #[serde(with="base64_option")]
130    pub value: Option<Vec<u8>>,
131    pub partition: u16,
132    pub offset: i64
133}
134
135pub struct Subscriber {
136    inital_url: String,
137    group: String,
138    consumer_response: Option<ConsumerResponse>,
139    client: Box<dyn SimpleHttpClient>,
140    commit_state: CommitState,
141}
142
143impl From<(&str,u16)> for PartitionDefinition {
144    fn from((topic,partition): (&str,u16)) -> Self {
145        Self { topic: topic.to_owned(), partition}
146    }
147}
148
149impl From<Vec<(&str,u16)>> for PartitionDefinitions {
150    fn from(value: Vec<(&str,u16)>) -> Self {
151        PartitionDefinitions { partitions: value.into_iter().map(|e| e.into()).collect() }
152    }
153}
154
155impl Subscriber {
156    pub fn new(http_client: Box<dyn SimpleHttpClient>, inital_url: &str, group: &str, name: Option<&str>)->Result<Self,RedPandaError> {
157        let mut client = Self {inital_url: inital_url.to_owned(), group: group.to_owned(), consumer_response: Option::None, client: http_client, commit_state: CommitState::create()};
158        let consumer = Consumer::create(name);
159        let body = serde_json::to_vec(&consumer)
160            .map_err(|e| RedPandaError::nested("Error serializing JSON request",Box::new(e)))?
161        ;
162        let url = format!("{}consumers/{}", client.inital_url, client.group);
163        let headers = vec![("Content-Type","application/vnd.kafka.v2+json")];
164        if DEBUG {
165            debug!("Initializing using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&consumer).unwrap());
166            debug!("Headers: {:?}",headers);
167        }
168        let result = client.client.post(&url, &headers, &body)
169            .map_err(|e| RedPandaError::nested("error creating consumer",Box::new(e)))?;
170        if DEBUG {
171            debug!("Result text:\n{}", from_utf8(result.as_slice())
172                .map_err(|e| RedPandaError::nested("Issues creating utf8",Box::new(e)))?);
173        }
174
175        client.consumer_response = Some(serde_json::from_slice(&result)
176            .map_err(|e| RedPandaError::nested("Error parsing JSON Red Panda reply",Box::new(e)))?);
177        debug!("Create consumer:\n{:?}",client.consumer_response);
178        // client.commit_state()?;
179        Ok(client)
180    }
181
182    pub fn register_topic(&mut self, topics: Vec<(&str,u16)>) ->  Result<(), RedPandaError> {
183        let tlist: Vec<&str> = topics.iter()
184            .map(|(topic,_)|(*topic).clone())
185            .collect();
186        let subscr = SubscribeRequest{topics: tlist};
187        let url = format!("{}/subscription",self.consumer_response.as_ref().unwrap().base_uri);
188        let body = serde_json::to_vec(&subscr)
189            .map_err(|e| RedPandaError::nested("Error serializing subscription request",Box::new(e)))?;
190        if DEBUG {
191            debug!("Registering topic using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&subscr).unwrap())
192        }
193        let _ = self.client.post(&url, &[("Content-Type","application/vnd.kafka.v2+json")], &body)
194            .map_err(|e| RedPandaError::nested(&format!("error registering topic to: {:?}",url),Box::new(e)))?;
195
196        let l = topics.iter()
197            .flat_map(|(topic,partition_count)| (0..*partition_count).into_iter().map(|partition_number| (*topic,partition_number)) )
198            .collect();
199        self.create_commit_state(l)?;
200        // self.commit_state = CommitState::create();
201        self.commit_state()?;
202        Ok(())
203    }
204
205    pub fn process_record(&mut self, record: &Record) {
206        self.commit_state.process_record(record);
207
208        debug!("Current commit state:\n{:?}",self.commit_state);
209    }
210    // pub fn poll_thing<'a, T>(&mut self, bytecount: i32) -> Result<Vec<T>,RedPandaError> where for<'de> T: Deserialize<'de> + 'a {
211    //     let l: Vec<Record> = self.poll( bytecount)?;
212    //     let xx: Vec<T> = l.iter().filter_map(f) map(|f| serde_json::from_slice(&f.value[..])).collect();
213    // }
214
215    pub fn poll(&mut self, bytecount: i32) -> Result<Vec<Record>,RedPandaError> {
216        let url = format!("{}/records?timeout=10000&&max_bytes={}",self.consumer_response.as_ref().unwrap().base_uri,bytecount);
217        if DEBUG {
218            debug!("Calling get from url: {}",url);
219        }
220                // .header("Accept", "application/vnd.kafka.binary.v2+json")
221
222        let records = self.client.get(&url,&[("Accept", "application/vnd.kafka.binary.v2+json")])
223            .map_err(|e| RedPandaError::nested("error polling",Box::new(e)))?;
224        if DEBUG {
225            let text = String::from_utf8(records.clone()).unwrap();
226            warn!("Result poll body: {}",text);
227        }
228        let parsed:Vec<Record> = serde_json::from_slice(&records)
229            .map_err(|e|{RedPandaError::nested(&format!("Error parsing polling response. Response:\n{}",from_utf8(&records).unwrap_or("error")),Box::new(e))})?;
230        if DEBUG {
231            debug!("Polled from url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&parsed).unwrap());
232        }
233        Ok(parsed)
234    }
235    
236    fn create_commit_state(&mut self, offsets: Vec<(&str,u16)>) ->  Result<(), RedPandaError> {
237        // http://localhost:8082/consumers/test_group/instances/test_consumer/offsets
238        let base_uri = self.consumer_response.as_ref().unwrap().base_uri.as_str();
239        // let group = self.group;
240        debug!("BaseUrl: {}",base_uri);
241        // let instance = self.consumer_response.as_ref().unwrap().instance_id.as_str();
242        let url = format!("{}/offsets",base_uri);
243        let headers = [("Accept", "application/vnd.kafka.v2+json"),("Content-Type", "application/vnd.kafka.v2+json")];
244
245        let initial_offsets: PartitionDefinitions = offsets.into();
246        let body = serde_json::to_vec_pretty(&initial_offsets).map_err(|e| RedPandaError::nested("Error serializing initial offsets",Box::new(e)))?;
247        warn!("Get offsets Url: {}",url);
248        warn!("Get offsets Body: {}",from_utf8(&body).unwrap());
249        let res = self.client.get_with_body(&url, &headers, &body)
250            .map_err(|e| RedPandaError::nested("error reading state",Box::new(e)))?;
251        if DEBUG {
252            warn!("Get offsets Result:\n{}",from_utf8(&res).unwrap())
253        }
254    
255
256        let received_offsets: ReceivedOffsets = serde_json::from_slice(&res)
257            .map_err(|e| RedPandaError::nested("error reading state", Box::new(e)))?;
258        let mut commit_state = CommitState::create();
259        received_offsets.offsets.into_iter().for_each(|element| {
260            // floor the offset to 0. It appears to return -1 for new groups
261            commit_state.read_offset(element.topic, element.partition, element.offset.max(0))
262
263        });
264        self.commit_state = commit_state;
265        Ok(())
266        // for ofset in received_offsets
267
268
269    }
270
271    pub fn commit_state(&mut self) ->  Result<(), RedPandaError> {
272        let partitions = self.commit_state.partition_list();
273        info!("Committing topics: {:?}",&partitions);
274        let commits = HashMap::from([("partitions".to_owned(),partitions)]);
275        let body = serde_json::to_vec(&commits ).map_err(|e| RedPandaError::nested("Error serializing commit state",Box::new(e)))?;
276        // let value = from_utf8(&body).map_err(|_| RedPandaError("UTF8 error".to_owned()))?;
277        let url = format!("{}/offsets",self.consumer_response.as_ref().unwrap().base_uri);
278        if DEBUG {
279            warn!("Committing to url:{}\nBody:\n{}",url,serde_json::to_string_pretty(&commits).unwrap());
280        }
281        let _result = self.client.post(&url, &[("Accept","application/vnd.kafka.v2+json"),("Content-Type","application/vnd.kafka.v2+json")], &body)
282            .map_err(|e| RedPandaError::nested("error commiting state",Box::new(e)))?
283        ;
284        // I guess clear the commit state?
285        self.commit_state.clear();
286        Ok(())
287    }
288
289    pub fn instance_id(&self) -> Option<String> {
290        Some(self.consumer_response.as_ref().unwrap().instance_id.clone())
291    }
292}
293
294
295
296#[cfg(test)]
297#[cfg(feature = "reqwest")]
298mod tests {
299
300    #[test]
301    fn test_reqwest() {
302        
303    }
304}