redpanda_http/
publisher.rs

1use base64::{prelude::BASE64_STANDARD, Engine};
2use serde::{Deserialize, Serialize, ser::SerializeMap};
3use simplehttp::simplehttp::SimpleHttpClient;
4
5use crate::{base64_option, RedPandaError};
6
7
8#[derive(Deserialize)]
9pub struct PublishRecord {
10    pub key: Option<Vec<u8>>,
11    #[serde(with="base64_option")]
12    pub value: Option<Vec<u8>>,
13}
14
15impl PublishRecord {
16    pub fn from_string(message: String)->Self {
17        Self { key: None, value: Some(message.into_bytes()) }
18    }
19
20    pub fn from_bytes(value: Option<&[u8]>)->Self {
21        PublishRecord { key: None, value: value.map(|f|f.to_vec()) }
22    }
23}
24
25#[derive(Serialize)]
26pub struct PublishRecordList {
27    pub records: Vec<PublishRecord>
28}
29
30impl PublishRecordList {
31    pub fn from_string(message: String)->Self {
32        PublishRecordList{records: vec![PublishRecord::from_string(message)]}
33    } 
34}
35
36impl Serialize for PublishRecord {
37    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
38    where
39        S: serde::Serializer {
40        let field_count = self.key.as_ref().map(|_|1).unwrap_or(0) + 
41            self.value.as_ref().map(|_|1).unwrap_or(0);
42
43        // todo!()
44        let mut map = serializer.serialize_map(Some(field_count))?;
45        if self.key.is_some() {
46            map.serialize_entry("key",&BASE64_STANDARD.encode(self.key.as_ref().unwrap()))?;
47        }
48        if self.value.is_some() {
49            map.serialize_entry("value",&BASE64_STANDARD.encode(self.value.as_ref().unwrap()))?;
50        }
51        map.end()
52    }
53}
54
55pub struct Publisher {
56    inital_url: String,
57    client: Box<dyn SimpleHttpClient>,
58}
59
60impl Publisher {
61
62    pub fn new(http_client: Box<dyn SimpleHttpClient>, inital_url: &str)->Self {
63        Self {inital_url: inital_url.to_owned(), client: http_client}
64    }
65
66    // TODO: return list of OffsetRecordSent
67    pub fn publish(&mut self, topic: String, record: PublishRecordList)->Result<(), RedPandaError> {
68        let url = format!("{}topics/{}",self.inital_url,topic);
69        let l = serde_json::to_vec(&record).map_err(|e| RedPandaError::nested("error serializing publish", Box::new(e)))?;
70        let _reply = self.client.post(&url, &[("Content-Type","application/vnd.kafka.binary.v2+json")], &l[..])
71            .map_err(|e| RedPandaError::nested("error publishing",Box::new(e)))?;
72        Ok(())
73    }
74}