use bytes::BytesMut;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::{error, info};
const PRODUCE_API_KEY: i16 = 0;
const PRODUCE_API_VERSION: i16 = 0;
const CORRELATION_ID: i32 = 1;
const CLIENT_ID: &str = "tiny-kafka-producer";
pub struct KafkaProducer {
broker_address: String,
stream: Option<TcpStream>,
}
impl KafkaProducer {
pub async fn new(broker_address: String) -> io::Result<Self> {
let mut producer = KafkaProducer {
broker_address,
stream: None,
};
producer.connect().await?;
Ok(producer)
}
async fn connect(&mut self) -> io::Result<()> {
info!("Connecting to Kafka broker at {}", self.broker_address);
let stream = TcpStream::connect(&self.broker_address).await?;
self.stream = Some(stream);
info!("Successfully connected to Kafka broker");
Ok(())
}
pub async fn send_message(
&mut self,
topic: &str,
partition: i32,
message: &str,
) -> io::Result<()> {
info!(
"Sending message to topic: {}, partition: {}",
topic, partition
);
let request = self.create_produce_request(topic, partition, message);
if let Some(ref mut stream) = self.stream {
stream.write_all(&request).await?;
stream.flush().await?;
self.receive_response().await?;
info!("Message sent successfully");
} else {
error!("Failed to send message: Not connected to Kafka broker");
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"Not connected to Kafka broker",
));
}
Ok(())
}
fn create_produce_request(&self, topic: &str, partition: i32, message: &str) -> BytesMut {
let mut buffer = BytesMut::new();
buffer.extend_from_slice(&[0, 0, 0, 0]);
buffer.extend_from_slice(&PRODUCE_API_KEY.to_be_bytes());
buffer.extend_from_slice(&PRODUCE_API_VERSION.to_be_bytes());
buffer.extend_from_slice(&CORRELATION_ID.to_be_bytes());
let client_id_bytes = CLIENT_ID.as_bytes();
buffer.extend_from_slice(&(client_id_bytes.len() as i16).to_be_bytes());
buffer.extend_from_slice(client_id_bytes);
buffer.extend_from_slice(&1i16.to_be_bytes());
buffer.extend_from_slice(&1000i32.to_be_bytes());
buffer.extend_from_slice(&1i32.to_be_bytes());
buffer.extend_from_slice(&(topic.len() as i16).to_be_bytes());
buffer.extend_from_slice(topic.as_bytes());
buffer.extend_from_slice(&1i32.to_be_bytes());
buffer.extend_from_slice(&partition.to_be_bytes());
let message_set_size_pos = buffer.len();
buffer.extend_from_slice(&[0, 0, 0, 0]);
let message_bytes = message.as_bytes();
buffer.extend_from_slice(&(message_bytes.len() as i32).to_be_bytes());
buffer.extend_from_slice(&[0]);
buffer.extend_from_slice(&[0]);
buffer.extend_from_slice(&(-1i32).to_be_bytes());
buffer.extend_from_slice(&(message_bytes.len() as i32).to_be_bytes());
buffer.extend_from_slice(message_bytes);
let message_set_size = (buffer.len() - message_set_size_pos - 4) as i32;
buffer[message_set_size_pos..message_set_size_pos + 4]
.copy_from_slice(&message_set_size.to_be_bytes());
let total_size = (buffer.len() - 4) as i32;
buffer[0..4].copy_from_slice(&total_size.to_be_bytes());
buffer
}
async fn receive_response(&mut self) -> io::Result<()> {
let mut size_buf = [0u8; 4];
if let Some(ref mut stream) = self.stream {
stream.read_exact(&mut size_buf).await?;
let response_size = i32::from_be_bytes(size_buf);
let mut response = vec![0u8; response_size as usize];
stream.read_exact(&mut response).await?;
info!("Received response from Kafka broker");
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tracing_test::traced_test;
#[tokio::test]
#[traced_test]
async fn test_new_producer_success() {
let addr = "127.0.0.1:9092".to_string();
let producer = KafkaProducer::new(addr)
.await
.expect("Failed to create producer");
assert!(producer.stream.is_some());
assert!(logs_contain("Successfully connected to Kafka broker"));
}
#[tokio::test]
#[traced_test]
async fn test_send_message() {
let mut producer = KafkaProducer::new("127.0.0.1:9092".to_string())
.await
.expect("Failed to create producer");
tokio::time::sleep(Duration::from_secs(1)).await;
let topic = "test-topic";
let partition = 0;
let message = "Hello Kafka!";
let result = producer.send_message(topic, partition, message).await;
match result {
Ok(_) => {
assert!(logs_contain("Message sent successfully"));
}
Err(e) => {
panic!(
"Failed to send message: {:?}\nLogs: {}",
e,
std::panic::Location::caller()
);
}
}
}
}