redpanda_http/
publisher.rs1use base64::{prelude::BASE64_STANDARD, Engine};
2use serde::{Deserialize, Serialize, ser::SerializeMap};
3use simplehttp::simplehttp::SimpleHttpClient;
4
5use crate::{base64_option, RedPandaError};
6
7
8#[derive(Deserialize)]
9pub struct PublishRecord {
10 pub key: Option<Vec<u8>>,
11 #[serde(with="base64_option")]
12 pub value: Option<Vec<u8>>,
13}
14
15impl PublishRecord {
16 pub fn from_string(message: String)->Self {
17 Self { key: None, value: Some(message.into_bytes()) }
18 }
19
20 pub fn from_bytes(value: Option<&[u8]>)->Self {
21 PublishRecord { key: None, value: value.map(|f|f.to_vec()) }
22 }
23}
24
25#[derive(Serialize)]
26pub struct PublishRecordList {
27 pub records: Vec<PublishRecord>
28}
29
30impl PublishRecordList {
31 pub fn from_string(message: String)->Self {
32 PublishRecordList{records: vec![PublishRecord::from_string(message)]}
33 }
34}
35
36impl Serialize for PublishRecord {
37 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
38 where
39 S: serde::Serializer {
40 let field_count = self.key.as_ref().map(|_|1).unwrap_or(0) +
41 self.value.as_ref().map(|_|1).unwrap_or(0);
42
43 let mut map = serializer.serialize_map(Some(field_count))?;
45 if self.key.is_some() {
46 map.serialize_entry("key",&BASE64_STANDARD.encode(self.key.as_ref().unwrap()))?;
47 }
48 if self.value.is_some() {
49 map.serialize_entry("value",&BASE64_STANDARD.encode(self.value.as_ref().unwrap()))?;
50 }
51 map.end()
52 }
53}
54
55pub struct Publisher {
56 inital_url: String,
57 client: Box<dyn SimpleHttpClient>,
58}
59
60impl Publisher {
61
62 pub fn new(http_client: Box<dyn SimpleHttpClient>, inital_url: &str)->Self {
63 Self {inital_url: inital_url.to_owned(), client: http_client}
64 }
65
66 pub fn publish(&mut self, topic: String, record: PublishRecordList)->Result<(), RedPandaError> {
68 let url = format!("{}topics/{}",self.inital_url,topic);
69 let l = serde_json::to_vec(&record).map_err(|e| RedPandaError::nested("error serializing publish", Box::new(e)))?;
70 let _reply = self.client.post(&url, &[("Content-Type","application/vnd.kafka.binary.v2+json")], &l[..])
71 .map_err(|e| RedPandaError::nested("error publishing",Box::new(e)))?;
72 Ok(())
73 }
74}