use std::thread;
use rustfs_kafka::consumer::{Consumer, MessageSets};
use rustfs_kafka::error::{ConsumerError, Error, Result};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, info};
use crate::AsyncKafkaClient;
enum ConsumerCommand {
Poll {
response: oneshot::Sender<Result<MessageSets>>,
},
Commit {
response: oneshot::Sender<Result<()>>,
},
Shutdown,
}
pub struct AsyncConsumer {
sender: mpsc::Sender<ConsumerCommand>,
handle: Option<thread::JoinHandle<()>>,
}
impl AsyncConsumer {
pub async fn from_hosts(
hosts: Vec<String>,
group: String,
topics: Vec<String>,
) -> Result<Self> {
let consumer = {
let mut builder = Consumer::from_hosts(hosts).with_group(group);
for topic in topics {
builder = builder.with_topic(topic);
}
builder.create()?
};
let (sender, mut receiver) = mpsc::channel::<ConsumerCommand>(64);
let handle = thread::spawn(move || {
let mut consumer = consumer;
loop {
match receiver.blocking_recv() {
Some(ConsumerCommand::Poll { response }) => {
let result = consumer.poll();
let _ = response.send(result);
}
Some(ConsumerCommand::Commit { response }) => {
let result = consumer.commit_consumed();
let _ = response.send(result);
}
Some(ConsumerCommand::Shutdown) | None => {
debug!("AsyncConsumer thread shutting down");
break;
}
}
}
info!("AsyncConsumer background thread exited");
});
info!("AsyncConsumer created");
Ok(Self {
sender,
handle: Some(handle),
})
}
pub async fn from_client(
client: AsyncKafkaClient,
group: String,
topics: Vec<String>,
) -> Result<Self> {
Self::from_hosts(client.bootstrap_hosts().to_vec(), group, topics).await
}
pub async fn poll(&mut self) -> Result<MessageSets> {
let (tx, rx) = oneshot::channel();
self.sender
.send(ConsumerCommand::Poll { response: tx })
.await
.map_err(|_| Error::Consumer(ConsumerError::NoTopicsAssigned))?;
rx.await
.map_err(|_| Error::Consumer(ConsumerError::NoTopicsAssigned))?
}
pub async fn commit(&mut self) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.sender
.send(ConsumerCommand::Commit { response: tx })
.await
.map_err(|_| Error::Consumer(ConsumerError::NoTopicsAssigned))?;
rx.await
.map_err(|_| Error::Consumer(ConsumerError::NoTopicsAssigned))?
}
pub async fn close(mut self) -> Result<()> {
if let Some(handle) = self.handle.take() {
let _ = self.sender.send(ConsumerCommand::Shutdown).await;
let _ = handle.join();
}
Ok(())
}
}
impl Drop for AsyncConsumer {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.join().ok();
}
}
}
#[cfg(test)]
mod tests {
use rustfs_kafka::error::{ConnectionError, Error};
use super::*;
#[tokio::test]
async fn from_hosts_fails_with_unreachable_hosts() {
let result = AsyncConsumer::from_hosts(
vec!["127.0.0.1:1".to_owned()],
"test-group".to_owned(),
vec!["test-topic".to_owned()],
)
.await;
assert!(matches!(
result,
Err(Error::Connection(ConnectionError::NoHostReachable))
));
}
#[tokio::test]
async fn from_client_fails_with_unreachable_hosts() {
let client = AsyncKafkaClient::new(vec![]).await.unwrap();
let result = AsyncConsumer::from_client(
client,
"test-group".to_owned(),
vec!["test-topic".to_owned()],
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn drop_joins_background_thread_without_panic() {
let result = AsyncConsumer::from_hosts(
vec!["127.0.0.1:1".to_owned()],
"test-drop-group".to_owned(),
vec!["test-drop-topic".to_owned()],
)
.await;
assert!(result.is_err());
}
}