franz_client/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{
4    io::{BufRead, BufReader, BufWriter, Write},
5    net::{TcpStream, ToSocketAddrs},
6    thread,
7    time::Duration,
8};
9
10/// An abstraction for the number we send to the server to set
11/// what we want our client to do
12#[derive(Copy, Clone)]
13pub enum Api {
14    Produce,
15    Consume,
16}
17
18/// A simple Franz Producer that sends messages to the broker.
19///
20/// Note: This producer does not use any internal buffering!
21///
22/// ```rust,ignore
23/// use franz_client::{Producer, FranzClientError};
24///
25/// fn main() -> Result<(), FranzClientError> {
26///     let mut producer = Producer::new("127.0.0.1:8085", "test")?;
27///     producer.send_unbuffered("i was here! :3")?;
28///     producer.flush()
29/// }
30/// ```
31pub struct Producer {
32    inner: BufWriter<TcpStream>,
33}
34
35#[derive(thiserror::Error, Debug)]
36pub enum FranzClientError {
37    #[error(transparent)]
38    AddrParseError(#[from] std::net::AddrParseError),
39    #[error(transparent)]
40    IoError(#[from] std::io::Error),
41}
42
43fn send_handshake(
44    sock: &mut BufWriter<TcpStream>,
45    handshake: &str,
46) -> Result<(), FranzClientError> {
47    sock.write_all(&(handshake.len() as u32).to_be_bytes())?;
48    sock.write_all(handshake.as_bytes())?;
49    Ok(sock.flush()?)
50}
51
52impl Producer {
53    pub fn new(broker: impl ToSocketAddrs, topic: &str) -> Result<Self, FranzClientError> {
54        let sock = TcpStream::connect(broker)?;
55        let mut sock = BufWriter::new(sock);
56
57        let handshake = format!("version=1,topic={},api=produce", topic);
58        send_handshake(&mut sock, &handshake)?;
59
60        Ok(Producer { inner: sock })
61    }
62
63    pub fn send<D: AsRef<[u8]>>(&mut self, msg: D) -> Result<(), FranzClientError> {
64        // IF WE SEND MESSAGE THAT DOES NOT MATCH
65        // EXPECTED BYTES, WARN
66        self.inner.write_all(msg.as_ref())?;
67        self.inner.write_all(b"\n")?;
68
69        Ok(())
70    }
71
72    pub fn send_unbuffered<D: AsRef<[u8]>>(&mut self, msg: D) -> Result<(), FranzClientError> {
73        self.send(msg)?;
74        self.inner.flush()?;
75        Ok(())
76    }
77
78    pub fn flush(&mut self) -> Result<(), FranzClientError> {
79        Ok(self.inner.flush()?)
80    }
81}
82
83/// A simple Franz Consumer that receives messages from the broker
84///
85/// ```rust,ignore
86/// use franz_client::{Consumer, FranzClientError};
87///
88/// fn main() -> Result<(), FranzClientError> {
89///     let mut consumer = Consumer::new("127.0.0.1:8085", "test", None)?;
90///     let msg = consumer.recv()?;
91///     Ok(println!("{}", String::from_utf8_lossy(&msg)))
92/// }
93/// ```
94pub struct Consumer {
95    inner: BufReader<TcpStream>,
96}
97
98impl Consumer {
99    pub fn new(
100        broker: impl ToSocketAddrs,
101        topic: &str,
102        group: Option<u16>,
103    ) -> Result<Self, FranzClientError> {
104        let sock = TcpStream::connect(broker)?;
105        let sock_c = sock.try_clone()?;
106        let mut sock = BufWriter::new(sock);
107
108        let handshake = match group {
109            Some(g) => format!("version=1,topic={},group={},api=consume", topic, g),
110            None => format!("version=1,topic={},api=consume", topic),
111        };
112        send_handshake(&mut sock, &handshake)?;
113
114        // KEEPALIVE
115        thread::spawn(move || loop {
116            sock.write_all(b"PING\n").unwrap();
117            sock.flush().unwrap();
118            thread::sleep(Duration::from_secs(60));
119        });
120
121        let inner = BufReader::new(sock_c);
122
123        Ok(Consumer { inner })
124    }
125
126    pub fn recv(&mut self) -> Result<Vec<u8>, FranzClientError> {
127        // WE CAN READ EXPECTED BYTES FROM FRANZ
128        // AND ALLOCATE OUR BUFFER WITH THE EXPECTED BYTES
129        let mut buf = Vec::new();
130        self.inner.read_until(b'\n', &mut buf)?;
131        Ok(buf)
132    }
133}