1#![doc = include_str!("../README.md")]
2
3use std::{
4 io::{BufRead, BufReader, BufWriter, Write},
5 net::{TcpStream, ToSocketAddrs},
6 thread,
7 time::Duration,
8};
9
10#[derive(Copy, Clone)]
13pub enum Api {
14 Produce,
15 Consume,
16}
17
18pub struct Producer {
32 inner: BufWriter<TcpStream>,
33}
34
35#[derive(thiserror::Error, Debug)]
36pub enum FranzClientError {
37 #[error(transparent)]
38 AddrParseError(#[from] std::net::AddrParseError),
39 #[error(transparent)]
40 IoError(#[from] std::io::Error),
41}
42
43fn send_handshake(
44 sock: &mut BufWriter<TcpStream>,
45 handshake: &str,
46) -> Result<(), FranzClientError> {
47 sock.write_all(&(handshake.len() as u32).to_be_bytes())?;
48 sock.write_all(handshake.as_bytes())?;
49 Ok(sock.flush()?)
50}
51
52impl Producer {
53 pub fn new(broker: impl ToSocketAddrs, topic: &str) -> Result<Self, FranzClientError> {
54 let sock = TcpStream::connect(broker)?;
55 let mut sock = BufWriter::new(sock);
56
57 let handshake = format!("version=1,topic={},api=produce", topic);
58 send_handshake(&mut sock, &handshake)?;
59
60 Ok(Producer { inner: sock })
61 }
62
63 pub fn send<D: AsRef<[u8]>>(&mut self, msg: D) -> Result<(), FranzClientError> {
64 self.inner.write_all(msg.as_ref())?;
67 self.inner.write_all(b"\n")?;
68
69 Ok(())
70 }
71
72 pub fn send_unbuffered<D: AsRef<[u8]>>(&mut self, msg: D) -> Result<(), FranzClientError> {
73 self.send(msg)?;
74 self.inner.flush()?;
75 Ok(())
76 }
77
78 pub fn flush(&mut self) -> Result<(), FranzClientError> {
79 Ok(self.inner.flush()?)
80 }
81}
82
83pub struct Consumer {
95 inner: BufReader<TcpStream>,
96}
97
98impl Consumer {
99 pub fn new(
100 broker: impl ToSocketAddrs,
101 topic: &str,
102 group: Option<u16>,
103 ) -> Result<Self, FranzClientError> {
104 let sock = TcpStream::connect(broker)?;
105 let sock_c = sock.try_clone()?;
106 let mut sock = BufWriter::new(sock);
107
108 let handshake = match group {
109 Some(g) => format!("version=1,topic={},group={},api=consume", topic, g),
110 None => format!("version=1,topic={},api=consume", topic),
111 };
112 send_handshake(&mut sock, &handshake)?;
113
114 thread::spawn(move || loop {
116 sock.write_all(b"PING\n").unwrap();
117 sock.flush().unwrap();
118 thread::sleep(Duration::from_secs(60));
119 });
120
121 let inner = BufReader::new(sock_c);
122
123 Ok(Consumer { inner })
124 }
125
126 pub fn recv(&mut self) -> Result<Vec<u8>, FranzClientError> {
127 let mut buf = Vec::new();
130 self.inner.read_until(b'\n', &mut buf)?;
131 Ok(buf)
132 }
133}