rafka_consumer/
consumer.rs1use rafka_core::proto::rafka::{
2 broker_service_client::BrokerServiceClient,
3 ConsumeRequest, SubscribeRequest, AcknowledgeRequest, UpdateOffsetRequest,
4 RegisterRequest, ClientType,
5};
6use tonic::Request;
7use tokio::sync::mpsc;
8use uuid::Uuid;
9
10pub struct Consumer {
11 client: BrokerServiceClient<tonic::transport::Channel>,
12 consumer_id: String,
13 _current_offset: i64,
14}
15
16impl Consumer {
17 pub async fn new(addr: &str) -> Result<Self, Box<dyn std::error::Error>> {
18 let mut client = BrokerServiceClient::connect(format!("http://{}", addr)).await?;
19 let consumer_id = Uuid::new_v4().to_string();
20
21 client
23 .register(Request::new(RegisterRequest {
24 client_id: consumer_id.clone(),
25 client_type: ClientType::Consumer as i32,
26 }))
27 .await?;
28
29 println!("Consumer registered with ID: {}", consumer_id);
30
31 Ok(Self {
32 client,
33 consumer_id,
34 _current_offset: 0,
35 })
36 }
37
38 pub async fn subscribe(&mut self, topic: String) -> Result<(), Box<dyn std::error::Error>> {
39 self.client
40 .subscribe(Request::new(SubscribeRequest {
41 consumer_id: self.consumer_id.clone(),
42 topic,
43 }))
44 .await?;
45 Ok(())
46 }
47
48 pub async fn consume(&mut self, topic: String) -> Result<mpsc::Receiver<String>, Box<dyn std::error::Error>> {
49 let (tx, rx) = mpsc::channel(100);
50 let mut stream = self.client
51 .consume(Request::new(ConsumeRequest {
52 id: self.consumer_id.clone(),
53 topic: topic.clone(),
54 }))
55 .await?
56 .into_inner();
57
58 let consumer_id = self.consumer_id.clone();
59 let mut client = self.client.clone();
60
61 tokio::spawn(async move {
62 while let Ok(Some(message)) = stream.message().await {
63 let _ = tx.send(message.payload).await;
64
65 let _ = client
67 .acknowledge(Request::new(AcknowledgeRequest {
68 consumer_id: consumer_id.clone(),
69 topic: topic.clone(),
70 message_id: message.message_id,
71 }))
72 .await;
73
74 let _ = client
76 .update_offset(Request::new(UpdateOffsetRequest {
77 consumer_id: consumer_id.clone(),
78 topic: topic.clone(),
79 offset: message.offset,
80 }))
81 .await;
82 }
83 });
84
85 Ok(rx)
86 }
87}