Skip to main content

rafka_consumer/
consumer.rs

1use rafka_core::proto::rafka::{
2    broker_service_client::BrokerServiceClient,
3    ConsumeRequest, SubscribeRequest, AcknowledgeRequest, UpdateOffsetRequest,
4    RegisterRequest, ClientType,
5};
6use tonic::Request;
7use tokio::sync::mpsc;
8use uuid::Uuid;
9
10pub struct Consumer {
11    client: BrokerServiceClient<tonic::transport::Channel>,
12    consumer_id: String,
13    _current_offset: i64,
14}
15
16impl Consumer {
17    pub async fn new(addr: &str) -> Result<Self, Box<dyn std::error::Error>> {
18        let mut client = BrokerServiceClient::connect(format!("http://{}", addr)).await?;
19        let consumer_id = Uuid::new_v4().to_string();
20
21        // Register consumer
22        client
23            .register(Request::new(RegisterRequest {
24                client_id: consumer_id.clone(),
25                client_type: ClientType::Consumer as i32,
26            }))
27            .await?;
28
29        println!("Consumer registered with ID: {}", consumer_id);
30
31        Ok(Self {
32            client,
33            consumer_id,
34            _current_offset: 0,
35        })
36    }
37
38    pub async fn subscribe(&mut self, topic: String) -> Result<(), Box<dyn std::error::Error>> {
39        self.client
40            .subscribe(Request::new(SubscribeRequest {
41                consumer_id: self.consumer_id.clone(),
42                topic,
43            }))
44            .await?;
45        Ok(())
46    }
47
48    pub async fn consume(&mut self, topic: String) -> Result<mpsc::Receiver<String>, Box<dyn std::error::Error>> {
49        let (tx, rx) = mpsc::channel(100);
50        let mut stream = self.client
51            .consume(Request::new(ConsumeRequest {
52                id: self.consumer_id.clone(),
53                topic: topic.clone(),
54            }))
55            .await?
56            .into_inner();
57
58        let consumer_id = self.consumer_id.clone();
59        let mut client = self.client.clone();
60
61        tokio::spawn(async move {
62            while let Ok(Some(message)) = stream.message().await {
63                let _ = tx.send(message.payload).await;
64                
65                // Acknowledge message
66                let _ = client
67                    .acknowledge(Request::new(AcknowledgeRequest {
68                        consumer_id: consumer_id.clone(),
69                        topic: topic.clone(),
70                        message_id: message.message_id,
71                    }))
72                    .await;
73
74                // Update offset
75                let _ = client
76                    .update_offset(Request::new(UpdateOffsetRequest {
77                        consumer_id: consumer_id.clone(),
78                        topic: topic.clone(),
79                        offset: message.offset,
80                    }))
81                    .await;
82            }
83        });
84
85        Ok(rx)
86    }
87}