redpanda_http_rust/
panda_client.rs

1use std::{collections::HashMap, str::from_utf8};
2
3use serde::{Serialize, Deserialize};
4
5const DEBUG: bool = true;
6
7#[derive(Serialize)]
8pub struct Consumer<'a> {
9    format: &'a str,
10    #[serde(rename="auto.offset.reset")]
11    auto_offset_reset: &'a str,
12    #[serde(rename="auto.commit.enable")]
13    auto_commit_enable: &'a str,
14    #[serde(rename="fetch.min.bytes")]
15    fetch_min_bytes: &'a str,
16    #[serde(rename="consumer.request.timeout.ms")]
17    consumer_request_timeout_ms: &'a str,
18}
19
20#[derive(Debug)]
21pub struct RedPandaError(pub String);
22pub trait RedPandaHttpClient {
23    // fn post(&mut self,  url: &str, headers: &mut Vec<(&str, &str)>, data: Vec<u8>)->Result<Vec<u8>,RedPandaError>;
24    fn post(&mut self, url: &str, headers: &mut Vec<(String, String)>, data: Vec<u8>)->Result<Vec<u8>,RedPandaError>;
25
26    fn get(&mut self, url: &str)->Result<Vec<u8>, RedPandaError>;
27}
28
29
30
31impl<'a> Consumer<'a> {
32    fn create()->Consumer<'a> {
33        Consumer { format: "json", auto_offset_reset: "earliest", auto_commit_enable: "false", fetch_min_bytes: "0", consumer_request_timeout_ms: "10000" }
34    }
35}
36
37#[derive(Serialize)]
38pub struct CommitState {
39    topics: HashMap<String,HashMap<u16,u64>>
40}
41#[derive(Serialize)]
42pub struct CommitPartition {
43    topic: String,
44    partition: u16,
45    offset: u64
46}
47
48impl CommitState {
49    pub fn create()->CommitState {
50        CommitState { topics: HashMap::new() }
51    }
52
53    pub fn process_record(&mut self, record: &Record) {
54        let topic = self.topics.get_mut(&record.topic);
55        match topic {
56            None => {
57                let mut new_topic: HashMap::<u16,u64> = HashMap::new();
58                new_topic.insert(record.partition, record.offset + 1);
59                self.topics.insert(record.topic.to_owned(), new_topic);
60            }
61            Some(map) => {
62                map.insert(record.partition, record.offset + 1);
63            }
64        }
65    }
66
67    pub fn partition_list(&self)->Vec<CommitPartition> {
68        let mut result: Vec<CommitPartition> = vec![];
69        for (topic,partitions) in &self.topics {
70            for (partition,offset) in partitions.iter() {
71                result.push(CommitPartition {topic : topic.clone(), partition: *partition, offset: *offset});
72            }
73        }
74        result
75    }
76}
77#[derive(Deserialize)]
78#[derive(Clone)]
79struct ConsumerResponse {
80    instance_id: String,
81    base_uri: String
82}
83#[derive(Serialize)]
84struct SubscribeRequest<'a> {
85    topics: Vec<&'a str>
86}
87
88#[derive(Deserialize,Serialize)]
89pub struct Record {
90    pub topic: String,
91    #[serde(with="base64")]
92    pub key: Vec<u8>,
93    #[serde(with="base64")]
94    pub value: Vec<u8>,
95    pub partition: u16,
96    pub offset: u64
97    
98}
99
100#[derive(Clone)]
101pub struct RedPandaClient {
102    inital_url: String,
103    group: String,
104    consumer_response: Option<ConsumerResponse>,
105}
106
107impl RedPandaClient {
108    pub fn new(http_client: &mut Box<dyn RedPandaHttpClient>, inital_url: &str, group: &str)->Result<RedPandaClient,RedPandaError> {
109        let mut client = RedPandaClient { inital_url: inital_url.to_owned(), group: group.to_owned(), consumer_response: Option::None};
110        let consumer = Consumer::create();
111        let body = serde_json::to_vec(&consumer)
112            .map_err(|_| RedPandaError("Error serializing JSON request".to_owned()))?
113        ;
114        let url = format!("{}{}", client.inital_url, client.group);
115        let mut headers = vec![("Content-Type".to_owned(),"application/vnd.kafka.v2+json".to_owned())];
116        if DEBUG {
117            println!("Initializing using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&consumer).unwrap())
118        }
119        let result = http_client.post(&url, &mut headers, body)?;
120        if DEBUG {
121            println!("Result text:\n{}", from_utf8(result.as_slice()).map_err(|_| RedPandaError("Issues creating utf8".to_owned()))?);
122        }
123        client.consumer_response = Some(serde_json::from_slice(&result).map_err(|_| RedPandaError("Error parsing JSON Red Panda reply".to_owned()))?);
124        Ok(client)
125    }
126
127    pub fn register_topic(&mut self, client: &mut Box<dyn RedPandaHttpClient>, topics: Vec<&str>) ->  Result<(), RedPandaError> {
128        let subscr = SubscribeRequest{topics};
129        let url = format!("{}/subscription",self.consumer_response.as_ref().unwrap().base_uri);
130        let body = serde_json::to_vec(&subscr)
131            .map_err(|_| RedPandaError("Error serializing subscription request".to_owned()))?;
132        if DEBUG {
133            println!("Registering topic using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&subscr).unwrap())
134        }
135        let _ = client.post(&url, &mut Default::default(), body);
136        Ok(())
137    }
138
139    pub fn poll(&mut self, client: &mut Box<dyn RedPandaHttpClient>, bytecount: i32) -> Result<Vec<Record>,RedPandaError> {
140        let url = format!("{}/records?timeout=10000&&max_bytes={}",self.consumer_response.as_ref().unwrap().base_uri,bytecount);
141        if DEBUG {
142            println!("Calling get from url: {}",url);
143        }
144        let records = client.get(&url)?;
145        let parsed:Vec<Record> = serde_json::from_slice(&records).map_err(|_| RedPandaError("Error parsing polling response".to_owned()))?;
146        if DEBUG {
147            println!("Polled from url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&parsed).unwrap());
148        }
149        Ok(parsed)
150    }
151    
152    
153    pub fn commit_state(&mut self, client: &mut Box<dyn RedPandaHttpClient>, state: &CommitState) ->  Result<(), RedPandaError> {
154        let partitions = state.partition_list();
155        let commits = HashMap::from([("partitions".to_owned(),partitions)]);
156        let body = serde_json::to_vec(&commits ).map_err(|_| RedPandaError("Error serializing commit state".to_owned()))?;
157        // let value = from_utf8(&body).map_err(|_| RedPandaError("UTF8 error".to_owned()))?;
158        let url = format!("{}/offsets",self.consumer_response.as_ref().unwrap().base_uri);
159        if DEBUG {
160            println!("Committing to url:{}\nBody:\n{}",url,serde_json::to_string_pretty(&commits).unwrap());
161        }
162        let _ = client.post(&url, &mut Default::default(), body);
163        Ok(())
164    }
165
166    pub fn instance_id(&self) -> Option<String> {
167        Some(self.consumer_response.as_ref().unwrap().instance_id.clone())
168    }
169}
170
171mod base64 {
172    use base64::Engine;
173    use serde::{Serialize, Deserialize};
174    use serde::{Deserializer, Serializer};
175
176    pub fn serialize<S: Serializer>(v: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
177        // base64::engine::GeneralPurpose
178        // BASE64_STANDARD::encode
179        let base64 = base64::prelude::BASE64_STANDARD.encode(v);
180        String::serialize(&base64, s)
181    }
182    
183
184    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
185        let base64 = String::deserialize(d)?;
186        base64::prelude::BASE64_STANDARD.decode(base64.as_bytes())
187            .map_err(serde::de::Error::custom)
188    }
189}