iggy_binary_protocol/client/binary_consumer_groups/
mod.rs

1/* Licensed to the Apache Software Foundation (ASF) under one
2 * or more contributor license agreements.  See the NOTICE file
3 * distributed with this work for additional information
4 * regarding copyright ownership.  The ASF licenses this file
5 * to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance
7 * with the License.  You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied.  See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19use crate::{BinaryClient, ConsumerGroupClient};
20
21use crate::utils::auth::fail_if_not_authenticated;
22use crate::utils::mapper;
23use iggy_common::create_consumer_group::CreateConsumerGroup;
24use iggy_common::delete_consumer_group::DeleteConsumerGroup;
25use iggy_common::get_consumer_group::GetConsumerGroup;
26use iggy_common::get_consumer_groups::GetConsumerGroups;
27use iggy_common::join_consumer_group::JoinConsumerGroup;
28use iggy_common::leave_consumer_group::LeaveConsumerGroup;
29use iggy_common::{ConsumerGroup, ConsumerGroupDetails, Identifier, IggyError};
30
31#[async_trait::async_trait]
32impl<B: BinaryClient> ConsumerGroupClient for B {
33    async fn get_consumer_group(
34        &self,
35        stream_id: &Identifier,
36        topic_id: &Identifier,
37        group_id: &Identifier,
38    ) -> Result<Option<ConsumerGroupDetails>, IggyError> {
39        fail_if_not_authenticated(self).await?;
40        let response = self
41            .send_with_response(&GetConsumerGroup {
42                stream_id: stream_id.clone(),
43                topic_id: topic_id.clone(),
44                group_id: group_id.clone(),
45            })
46            .await?;
47        if response.is_empty() {
48            return Ok(None);
49        }
50
51        mapper::map_consumer_group(response).map(Some)
52    }
53
54    async fn get_consumer_groups(
55        &self,
56        stream_id: &Identifier,
57        topic_id: &Identifier,
58    ) -> Result<Vec<ConsumerGroup>, IggyError> {
59        fail_if_not_authenticated(self).await?;
60        let response = self
61            .send_with_response(&GetConsumerGroups {
62                stream_id: stream_id.clone(),
63                topic_id: topic_id.clone(),
64            })
65            .await?;
66        mapper::map_consumer_groups(response)
67    }
68
69    async fn create_consumer_group(
70        &self,
71        stream_id: &Identifier,
72        topic_id: &Identifier,
73        name: &str,
74        group_id: Option<u32>,
75    ) -> Result<ConsumerGroupDetails, IggyError> {
76        fail_if_not_authenticated(self).await?;
77        let response = self
78            .send_with_response(&CreateConsumerGroup {
79                stream_id: stream_id.clone(),
80                topic_id: topic_id.clone(),
81                name: name.to_string(),
82                group_id,
83            })
84            .await?;
85        mapper::map_consumer_group(response)
86    }
87
88    async fn delete_consumer_group(
89        &self,
90        stream_id: &Identifier,
91        topic_id: &Identifier,
92        group_id: &Identifier,
93    ) -> Result<(), IggyError> {
94        fail_if_not_authenticated(self).await?;
95        self.send_with_response(&DeleteConsumerGroup {
96            stream_id: stream_id.clone(),
97            topic_id: topic_id.clone(),
98            group_id: group_id.clone(),
99        })
100        .await?;
101        Ok(())
102    }
103
104    async fn join_consumer_group(
105        &self,
106        stream_id: &Identifier,
107        topic_id: &Identifier,
108        group_id: &Identifier,
109    ) -> Result<(), IggyError> {
110        fail_if_not_authenticated(self).await?;
111        self.send_with_response(&JoinConsumerGroup {
112            stream_id: stream_id.clone(),
113            topic_id: topic_id.clone(),
114            group_id: group_id.clone(),
115        })
116        .await?;
117        Ok(())
118    }
119
120    async fn leave_consumer_group(
121        &self,
122        stream_id: &Identifier,
123        topic_id: &Identifier,
124        group_id: &Identifier,
125    ) -> Result<(), IggyError> {
126        fail_if_not_authenticated(self).await?;
127        self.send_with_response(&LeaveConsumerGroup {
128            stream_id: stream_id.clone(),
129            topic_id: topic_id.clone(),
130            group_id: group_id.clone(),
131        })
132        .await?;
133        Ok(())
134    }
135}