iggy_binary_protocol/client/binary_consumer_groups/
mod.rs1use crate::{BinaryClient, ConsumerGroupClient};
20
21use crate::utils::auth::fail_if_not_authenticated;
22use crate::utils::mapper;
23use iggy_common::create_consumer_group::CreateConsumerGroup;
24use iggy_common::delete_consumer_group::DeleteConsumerGroup;
25use iggy_common::get_consumer_group::GetConsumerGroup;
26use iggy_common::get_consumer_groups::GetConsumerGroups;
27use iggy_common::join_consumer_group::JoinConsumerGroup;
28use iggy_common::leave_consumer_group::LeaveConsumerGroup;
29use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError};
30
31#[async_trait::async_trait]
32impl<B: BinaryClient> ConsumerGroupClient for B {
33 async fn get_consumer_group(
34 &self,
35 stream_id: &Identifier,
36 topic_id: &Identifier,
37 group_id: &Identifier,
38 ) -> Result<Option<ConsumerGroupDetails>, IggyError> {
39 fail_if_not_authenticated(self).await?;
40 let response = self
41 .send_with_response(&GetConsumerGroup {
42 stream_id: stream_id.clone(),
43 topic_id: topic_id.clone(),
44 group_id: group_id.clone(),
45 })
46 .await?;
47 if response.is_empty() {
48 return Ok(None);
49 }
50
51 mapper::map_consumer_group(response).map(Some)
52 }
53
54 async fn get_consumer_groups(
55 &self,
56 stream_id: &Identifier,
57 topic_id: &Identifier,
58 ) -> Result<Vec<ConsumerGroup>, IggyError> {
59 fail_if_not_authenticated(self).await?;
60 let response = self
61 .send_with_response(&GetConsumerGroups {
62 stream_id: stream_id.clone(),
63 topic_id: topic_id.clone(),
64 })
65 .await?;
66 mapper::map_consumer_groups(response)
67 }
68
69 async fn create_consumer_group(
70 &self,
71 stream_id: &Identifier,
72 topic_id: &Identifier,
73 name: &str,
74 group_id: Option<u32>,
75 ) -> Result<ConsumerGroupDetails, IggyError> {
76 fail_if_not_authenticated(self).await?;
77 let response = self
78 .send_with_response(&CreateConsumerGroup {
79 stream_id: stream_id.clone(),
80 topic_id: topic_id.clone(),
81 name: name.to_string(),
82 group_id,
83 })
84 .await?;
85 mapper::map_consumer_group(response)
86 }
87
88 async fn delete_consumer_group(
89 &self,
90 stream_id: &Identifier,
91 topic_id: &Identifier,
92 group_id: &Identifier,
93 ) -> Result<(), IggyError> {
94 fail_if_not_authenticated(self).await?;
95 self.send_with_response(&DeleteConsumerGroup {
96 stream_id: stream_id.clone(),
97 topic_id: topic_id.clone(),
98 group_id: group_id.clone(),
99 })
100 .await?;
101 Ok(())
102 }
103
104 async fn join_consumer_group(
105 &self,
106 stream_id: &Identifier,
107 topic_id: &Identifier,
108 group_id: &Identifier,
109 ) -> Result<(), IggyError> {
110 fail_if_not_authenticated(self).await?;
111 self.send_with_response(&JoinConsumerGroup {
112 stream_id: stream_id.clone(),
113 topic_id: topic_id.clone(),
114 group_id: group_id.clone(),
115 })
116 .await?;
117 Ok(())
118 }
119
120 async fn leave_consumer_group(
121 &self,
122 stream_id: &Identifier,
123 topic_id: &Identifier,
124 group_id: &Identifier,
125 ) -> Result<(), IggyError> {
126 fail_if_not_authenticated(self).await?;
127 self.send_with_response(&LeaveConsumerGroup {
128 stream_id: stream_id.clone(),
129 topic_id: topic_id.clone(),
130 group_id: group_id.clone(),
131 })
132 .await?;
133 Ok(())
134 }
135}