redpanda_http_rust/
panda_client.rs1use std::{collections::HashMap, str::from_utf8};
2
3use serde::{Serialize, Deserialize};
4
5const DEBUG: bool = true;
6
7#[derive(Serialize)]
8pub struct Consumer<'a> {
9 format: &'a str,
10 #[serde(rename="auto.offset.reset")]
11 auto_offset_reset: &'a str,
12 #[serde(rename="auto.commit.enable")]
13 auto_commit_enable: &'a str,
14 #[serde(rename="fetch.min.bytes")]
15 fetch_min_bytes: &'a str,
16 #[serde(rename="consumer.request.timeout.ms")]
17 consumer_request_timeout_ms: &'a str,
18}
19
20#[derive(Debug)]
21pub struct RedPandaError(pub String);
22pub trait RedPandaHttpClient {
23 fn post(&mut self, url: &str, headers: &mut Vec<(String, String)>, data: Vec<u8>)->Result<Vec<u8>,RedPandaError>;
25
26 fn get(&mut self, url: &str)->Result<Vec<u8>, RedPandaError>;
27}
28
29
30
31impl<'a> Consumer<'a> {
32 fn create()->Consumer<'a> {
33 Consumer { format: "json", auto_offset_reset: "earliest", auto_commit_enable: "false", fetch_min_bytes: "0", consumer_request_timeout_ms: "10000" }
34 }
35}
36
37#[derive(Serialize)]
38pub struct CommitState {
39 topics: HashMap<String,HashMap<u16,u64>>
40}
41#[derive(Serialize)]
42pub struct CommitPartition {
43 topic: String,
44 partition: u16,
45 offset: u64
46}
47
48impl CommitState {
49 pub fn create()->CommitState {
50 CommitState { topics: HashMap::new() }
51 }
52
53 pub fn process_record(&mut self, record: &Record) {
54 let topic = self.topics.get_mut(&record.topic);
55 match topic {
56 None => {
57 let mut new_topic: HashMap::<u16,u64> = HashMap::new();
58 new_topic.insert(record.partition, record.offset + 1);
59 self.topics.insert(record.topic.to_owned(), new_topic);
60 }
61 Some(map) => {
62 map.insert(record.partition, record.offset + 1);
63 }
64 }
65 }
66
67 pub fn partition_list(&self)->Vec<CommitPartition> {
68 let mut result: Vec<CommitPartition> = vec![];
69 for (topic,partitions) in &self.topics {
70 for (partition,offset) in partitions.iter() {
71 result.push(CommitPartition {topic : topic.clone(), partition: *partition, offset: *offset});
72 }
73 }
74 result
75 }
76}
77#[derive(Deserialize)]
78#[derive(Clone)]
79struct ConsumerResponse {
80 instance_id: String,
81 base_uri: String
82}
83#[derive(Serialize)]
84struct SubscribeRequest<'a> {
85 topics: Vec<&'a str>
86}
87
88#[derive(Deserialize,Serialize)]
89pub struct Record {
90 pub topic: String,
91 #[serde(with="base64")]
92 pub key: Vec<u8>,
93 #[serde(with="base64")]
94 pub value: Vec<u8>,
95 pub partition: u16,
96 pub offset: u64
97
98}
99
100#[derive(Clone)]
101pub struct RedPandaClient {
102 inital_url: String,
103 group: String,
104 consumer_response: Option<ConsumerResponse>,
105}
106
107impl RedPandaClient {
108 pub fn new(http_client: &mut Box<dyn RedPandaHttpClient>, inital_url: &str, group: &str)->Result<RedPandaClient,RedPandaError> {
109 let mut client = RedPandaClient { inital_url: inital_url.to_owned(), group: group.to_owned(), consumer_response: Option::None};
110 let consumer = Consumer::create();
111 let body = serde_json::to_vec(&consumer)
112 .map_err(|_| RedPandaError("Error serializing JSON request".to_owned()))?
113 ;
114 let url = format!("{}{}", client.inital_url, client.group);
115 let mut headers = vec![("Content-Type".to_owned(),"application/vnd.kafka.v2+json".to_owned())];
116 if DEBUG {
117 println!("Initializing using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&consumer).unwrap())
118 }
119 let result = http_client.post(&url, &mut headers, body)?;
120 if DEBUG {
121 println!("Result text:\n{}", from_utf8(result.as_slice()).map_err(|_| RedPandaError("Issues creating utf8".to_owned()))?);
122 }
123 client.consumer_response = Some(serde_json::from_slice(&result).map_err(|_| RedPandaError("Error parsing JSON Red Panda reply".to_owned()))?);
124 Ok(client)
125 }
126
127 pub fn register_topic(&mut self, client: &mut Box<dyn RedPandaHttpClient>, topics: Vec<&str>) -> Result<(), RedPandaError> {
128 let subscr = SubscribeRequest{topics};
129 let url = format!("{}/subscription",self.consumer_response.as_ref().unwrap().base_uri);
130 let body = serde_json::to_vec(&subscr)
131 .map_err(|_| RedPandaError("Error serializing subscription request".to_owned()))?;
132 if DEBUG {
133 println!("Registering topic using url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&subscr).unwrap())
134 }
135 let _ = client.post(&url, &mut Default::default(), body);
136 Ok(())
137 }
138
139 pub fn poll(&mut self, client: &mut Box<dyn RedPandaHttpClient>, bytecount: i32) -> Result<Vec<Record>,RedPandaError> {
140 let url = format!("{}/records?timeout=10000&&max_bytes={}",self.consumer_response.as_ref().unwrap().base_uri,bytecount);
141 if DEBUG {
142 println!("Calling get from url: {}",url);
143 }
144 let records = client.get(&url)?;
145 let parsed:Vec<Record> = serde_json::from_slice(&records).map_err(|_| RedPandaError("Error parsing polling response".to_owned()))?;
146 if DEBUG {
147 println!("Polled from url: {}\nBody:\n{}",url,serde_json::to_string_pretty(&parsed).unwrap());
148 }
149 Ok(parsed)
150 }
151
152
153 pub fn commit_state(&mut self, client: &mut Box<dyn RedPandaHttpClient>, state: &CommitState) -> Result<(), RedPandaError> {
154 let partitions = state.partition_list();
155 let commits = HashMap::from([("partitions".to_owned(),partitions)]);
156 let body = serde_json::to_vec(&commits ).map_err(|_| RedPandaError("Error serializing commit state".to_owned()))?;
157 let url = format!("{}/offsets",self.consumer_response.as_ref().unwrap().base_uri);
159 if DEBUG {
160 println!("Committing to url:{}\nBody:\n{}",url,serde_json::to_string_pretty(&commits).unwrap());
161 }
162 let _ = client.post(&url, &mut Default::default(), body);
163 Ok(())
164 }
165
166 pub fn instance_id(&self) -> Option<String> {
167 Some(self.consumer_response.as_ref().unwrap().instance_id.clone())
168 }
169}
170
171mod base64 {
172 use base64::Engine;
173 use serde::{Serialize, Deserialize};
174 use serde::{Deserializer, Serializer};
175
176 pub fn serialize<S: Serializer>(v: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
177 let base64 = base64::prelude::BASE64_STANDARD.encode(v);
180 String::serialize(&base64, s)
181 }
182
183
184 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
185 let base64 = String::deserialize(d)?;
186 base64::prelude::BASE64_STANDARD.decode(base64.as_bytes())
187 .map_err(serde::de::Error::custom)
188 }
189}