Skip to main content

kafkit_client/admin/
mod.rs

1//! Admin client operations for topics, groups, configs, and cluster metadata.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::{KafkaClient, NewTopic};
6//!
7//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
8//! admin
9//!     .create_topics([NewTopic::new("orders", 3, 1)])
10//!     .await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_partitions_request::{
22    CreatePartitionsAssignment, CreatePartitionsTopic,
23};
24use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
25use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
26use kafka_protocol::messages::incremental_alter_configs_request::{
27    AlterConfigsResource, AlterableConfig,
28};
29use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
30use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
31use kafka_protocol::messages::{
32    AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
33    ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, CreatePartitionsRequest,
34    CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest,
35    DeleteGroupsResponse, DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest,
36    DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
37    IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
38    ListGroupsResponse, MetadataRequest, MetadataResponse, UpdateFeaturesRequest,
39    UpdateFeaturesResponse,
40};
41use kafka_protocol::protocol::{Request, StrBytes};
42use tracing::{debug, instrument};
43use uuid::Uuid;
44
45use crate::config::{AdminConfig, SaslMechanism};
46use crate::constants::{
47    ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
48    CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
49    DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
50    INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP, METADATA_VERSION_CAP,
51    UPDATE_FEATURES_VERSION_CAP,
52};
53use crate::network::scram;
54use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
55use crate::{AdminError, Result};
56
57#[derive(Debug, Clone)]
58/// New Topic.
59pub struct NewTopic {
60    /// Name returned by Kafka.
61    pub name: String,
62    /// Num Partitions.
63    pub num_partitions: i32,
64    /// Replication Factor.
65    pub replication_factor: i16,
66    /// Topic-level configuration entries.
67    pub configs: BTreeMap<String, String>,
68}
69
70impl NewTopic {
71    /// Creates a new value.
72    pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
73        Self {
74            name: name.into(),
75            num_partitions,
76            replication_factor,
77            configs: BTreeMap::new(),
78        }
79    }
80
81    /// Sets config and returns the updated value.
82    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
83        self.configs.insert(key.into(), value.into());
84        self
85    }
86
87    fn into_request_topic(self) -> Result<CreatableTopic> {
88        let name = validate_topic_name(self.name)?;
89        if self.num_partitions <= 0 {
90            return Err(AdminError::InvalidPartitionCount {
91                partitions: self.num_partitions,
92            }
93            .into());
94        }
95        if self.replication_factor <= 0 {
96            return Err(AdminError::InvalidReplicationFactor {
97                replication_factor: self.replication_factor,
98            }
99            .into());
100        }
101
102        let configs = self
103            .configs
104            .into_iter()
105            .map(|(key, value)| {
106                CreatableTopicConfig::default()
107                    .with_name(StrBytes::from_string(key))
108                    .with_value(Some(StrBytes::from_string(value)))
109            })
110            .collect();
111
112        Ok(CreatableTopic::default()
113            .with_name(StrBytes::from_string(name).into())
114            .with_num_partitions(self.num_partitions)
115            .with_replication_factor(self.replication_factor)
116            .with_configs(configs))
117    }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121/// New Partitions.
122pub struct NewPartitions {
123    /// Total Count.
124    pub total_count: i32,
125    /// Assignments.
126    pub assignments: Vec<Vec<i32>>,
127}
128
129impl NewPartitions {
130    /// Increase To.
131    pub fn increase_to(total_count: i32) -> Self {
132        Self {
133            total_count,
134            assignments: Vec::new(),
135        }
136    }
137
138    /// Sets assignment and returns the updated value.
139    pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
140    where
141        I: IntoIterator<Item = i32>,
142    {
143        self.assignments.push(broker_ids.into_iter().collect());
144        self
145    }
146
147    fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
148        let name = validate_topic_name(topic_name)?;
149        if self.total_count <= 0 {
150            return Err(AdminError::InvalidPartitionCount {
151                partitions: self.total_count,
152            }
153            .into());
154        }
155
156        let assignments = (!self.assignments.is_empty()).then(|| {
157            self.assignments
158                .into_iter()
159                .map(|broker_ids| {
160                    CreatePartitionsAssignment::default()
161                        .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
162                })
163                .collect()
164        });
165
166        Ok(CreatePartitionsTopic::default()
167            .with_name(StrBytes::from_string(name).into())
168            .with_count(self.total_count)
169            .with_assignments(assignments))
170    }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174/// Topic Listing.
175pub struct TopicListing {
176    /// Name returned by Kafka.
177    pub name: String,
178    /// Topic Id.
179    pub topic_id: Option<Uuid>,
180    /// Is Internal.
181    pub is_internal: bool,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq)]
185/// Topic Partition Description.
186pub struct TopicPartitionDescription {
187    /// Partition number.
188    pub partition: i32,
189    /// Leader Id.
190    pub leader_id: i32,
191    /// Leader Epoch.
192    pub leader_epoch: i32,
193    /// Replica Nodes.
194    pub replica_nodes: Vec<i32>,
195    /// Isr Nodes.
196    pub isr_nodes: Vec<i32>,
197    /// Offline Replicas.
198    pub offline_replicas: Vec<i32>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202/// Topic Description.
203pub struct TopicDescription {
204    /// Name returned by Kafka.
205    pub name: String,
206    /// Topic Id.
207    pub topic_id: Option<Uuid>,
208    /// Is Internal.
209    pub is_internal: bool,
210    /// Partitions.
211    pub partitions: Vec<TopicPartitionDescription>,
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
215/// Broker Description.
216pub struct BrokerDescription {
217    /// Broker Id.
218    pub broker_id: i32,
219    /// Host.
220    pub host: String,
221    /// Port.
222    pub port: i32,
223    /// Rack.
224    pub rack: Option<String>,
225    /// Is Fenced.
226    pub is_fenced: bool,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
230/// Cluster Description.
231pub struct ClusterDescription {
232    /// Cluster Id.
233    pub cluster_id: String,
234    /// Controller Id.
235    pub controller_id: i32,
236    /// Brokers.
237    pub brokers: Vec<BrokerDescription>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241/// Broker Feature Level.
242pub struct BrokerFeatureLevel {
243    /// Name returned by Kafka.
244    pub name: String,
245    /// Level.
246    pub level: i16,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
250/// Finalized broker feature update.
251pub struct FeatureUpdate {
252    /// Feature name.
253    pub name: String,
254    /// New finalized maximum version level. Values below 1 delete the finalized feature.
255    pub max_version_level: i16,
256    /// How the broker should apply an upgrade or downgrade.
257    pub upgrade_type: FeatureUpgradeType,
258}
259
260impl FeatureUpdate {
261    /// Creates an upgrade-only feature update.
262    pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
263        Self {
264            name: name.into(),
265            max_version_level,
266            upgrade_type: FeatureUpgradeType::Upgrade,
267        }
268    }
269
270    /// Creates a safe downgrade or deletion update.
271    pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
272        Self {
273            name: name.into(),
274            max_version_level,
275            upgrade_type: FeatureUpgradeType::SafeDowngrade,
276        }
277    }
278
279    /// Creates an unsafe downgrade or deletion update.
280    pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
281        Self {
282            name: name.into(),
283            max_version_level,
284            upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
285        }
286    }
287
288    fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
289        let name = validate_feature_name(self.name)?;
290        let allow_downgrade = self.upgrade_type.allows_downgrade();
291        let mut update = FeatureUpdateKey::default()
292            .with_feature(StrBytes::from_string(name))
293            .with_max_version_level(self.max_version_level);
294        if version == 0 {
295            update = update.with_allow_downgrade(allow_downgrade);
296        } else {
297            update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
298        }
299        Ok(update)
300    }
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304/// Type of finalized feature update to perform.
305pub enum FeatureUpgradeType {
306    /// Only allow upgrades.
307    Upgrade,
308    /// Allow safe, lossless downgrades.
309    SafeDowngrade,
310    /// Allow unsafe, potentially lossy downgrades.
311    UnsafeDowngrade,
312}
313
314impl FeatureUpgradeType {
315    fn as_protocol_value(self) -> i8 {
316        match self {
317            Self::Upgrade => 1,
318            Self::SafeDowngrade => 2,
319            Self::UnsafeDowngrade => 3,
320        }
321    }
322
323    fn allows_downgrade(self) -> bool {
324        !matches!(self, Self::Upgrade)
325    }
326}
327
328#[derive(Debug, Clone, PartialEq, Eq)]
329/// Consumer Group Listing.
330pub struct ConsumerGroupListing {
331    /// Group Id.
332    pub group_id: String,
333    /// Protocol Type.
334    pub protocol_type: String,
335    /// State.
336    pub state: Option<String>,
337    /// Group Type.
338    pub group_type: Option<String>,
339}
340
341#[derive(Debug, Clone, PartialEq, Eq)]
342/// Consumer Group Description.
343pub struct ConsumerGroupDescription {
344    /// Group Id.
345    pub group_id: String,
346    /// State.
347    pub state: String,
348    /// Protocol Type.
349    pub protocol_type: String,
350    /// Protocol Data.
351    pub protocol_data: String,
352    /// Members.
353    pub members: Vec<ConsumerGroupMemberDescription>,
354    /// Authorized Operations.
355    pub authorized_operations: Option<i32>,
356}
357
358#[derive(Debug, Clone, PartialEq, Eq)]
359/// Consumer Group Member Description.
360pub struct ConsumerGroupMemberDescription {
361    /// Member Id.
362    pub member_id: String,
363    /// Group Instance Id.
364    pub group_instance_id: Option<String>,
365    /// Client Id.
366    pub client_id: String,
367    /// Client Host.
368    pub client_host: String,
369    /// Member Metadata Bytes.
370    pub member_metadata_bytes: usize,
371    /// Member Assignment Bytes.
372    pub member_assignment_bytes: usize,
373}
374
375#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
376/// Config Resource Type.
377pub enum ConfigResourceType {
378    /// Unknown.
379    Unknown,
380    /// Topic name.
381    Topic,
382    /// Broker.
383    Broker,
384    /// Broker logger.
385    BrokerLogger,
386    /// Client metrics.
387    ClientMetrics,
388    /// Group.
389    Group,
390}
391
392impl ConfigResourceType {
393    fn as_protocol_value(self) -> i8 {
394        match self {
395            Self::Unknown => 0,
396            Self::Topic => 2,
397            Self::Broker => 4,
398            Self::BrokerLogger => 8,
399            Self::ClientMetrics => 16,
400            Self::Group => 32,
401        }
402    }
403
404    fn from_protocol_value(value: i8) -> Self {
405        match value {
406            2 => Self::Topic,
407            4 => Self::Broker,
408            8 => Self::BrokerLogger,
409            16 => Self::ClientMetrics,
410            32 => Self::Group,
411            _ => Self::Unknown,
412        }
413    }
414}
415
416#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
417/// Config Resource.
418pub struct ConfigResource {
419    /// Resource Type.
420    pub resource_type: ConfigResourceType,
421    /// Resource Name.
422    pub resource_name: String,
423}
424
425impl ConfigResource {
426    /// Creates a new value.
427    pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
428        Self {
429            resource_type,
430            resource_name: resource_name.into(),
431        }
432    }
433
434    /// Topic name.
435    pub fn topic(resource_name: impl Into<String>) -> Self {
436        Self::new(ConfigResourceType::Topic, resource_name)
437    }
438
439    /// Group.
440    pub fn group(resource_name: impl Into<String>) -> Self {
441        Self::new(ConfigResourceType::Group, resource_name)
442    }
443}
444
445#[derive(Debug, Clone, PartialEq, Eq)]
446/// Config Entry.
447pub struct ConfigEntry {
448    /// Name returned by Kafka.
449    pub name: String,
450    /// Record value.
451    pub value: Option<String>,
452    /// Read Only.
453    pub read_only: bool,
454    /// Config Source.
455    pub config_source: i8,
456    /// Is Sensitive.
457    pub is_sensitive: bool,
458    /// Config Type.
459    pub config_type: Option<i8>,
460    /// Documentation.
461    pub documentation: Option<String>,
462}
463
464#[derive(Debug, Clone, PartialEq, Eq)]
465/// Config Resource Config.
466pub struct ConfigResourceConfig {
467    /// Resource.
468    pub resource: ConfigResource,
469    /// Entries.
470    pub entries: BTreeMap<String, ConfigEntry>,
471}
472
473impl ConfigResourceConfig {
474    /// Entry.
475    pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
476        self.entries.get(name)
477    }
478}
479
480#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481/// Alter Config Op Type.
482pub enum AlterConfigOpType {
483    /// Set.
484    Set,
485    /// Delete.
486    Delete,
487    /// Append.
488    Append,
489    /// Subtract.
490    Subtract,
491}
492
493impl AlterConfigOpType {
494    fn as_protocol_value(self) -> i8 {
495        match self {
496            Self::Set => 0,
497            Self::Delete => 1,
498            Self::Append => 2,
499            Self::Subtract => 3,
500        }
501    }
502}
503
504#[derive(Debug, Clone, PartialEq, Eq)]
505/// Alter Config Op.
506pub struct AlterConfigOp {
507    /// Name returned by Kafka.
508    pub name: String,
509    /// Op Type.
510    pub op_type: AlterConfigOpType,
511    /// Record value.
512    pub value: Option<String>,
513}
514
515impl AlterConfigOp {
516    /// Set.
517    pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
518        Self {
519            name: name.into(),
520            op_type: AlterConfigOpType::Set,
521            value: Some(value.into()),
522        }
523    }
524
525    /// Delete.
526    pub fn delete(name: impl Into<String>) -> Self {
527        Self {
528            name: name.into(),
529            op_type: AlterConfigOpType::Delete,
530            value: None,
531        }
532    }
533}
534
535#[derive(Debug, Clone)]
536/// Client for Kafka admin operations.
537pub struct KafkaAdmin {
538    config: AdminConfig,
539}
540
541impl KafkaAdmin {
542    #[instrument(
543        name = "admin.connect",
544        level = "debug",
545        skip(config),
546        fields(
547            bootstrap_server_count = config.bootstrap_servers.len(),
548            client_id = %config.client_id
549        )
550    )]
551    /// Connects to Kafka and returns the client.
552    pub async fn connect(config: AdminConfig) -> Result<Self> {
553        let admin = Self { config };
554        admin.warm_up().await?;
555        debug!("admin client connected");
556        Ok(admin)
557    }
558
559    #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
560    /// Create Topics.
561    pub async fn create_topics<I>(&self, topics: I) -> Result<()>
562    where
563        I: IntoIterator<Item = NewTopic>,
564    {
565        let topics = topics
566            .into_iter()
567            .map(NewTopic::into_request_topic)
568            .collect::<Result<Vec<_>>>()?;
569        if topics.is_empty() {
570            return Ok(());
571        }
572
573        let request = CreateTopicsRequest::default()
574            .with_topics(topics)
575            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
576            .with_validate_only(false);
577        let response: CreateTopicsResponse = self
578            .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
579            .await?;
580
581        for topic in response.topics {
582            let name = topic.name.0.to_string();
583            if let Some(error) = topic
584                .error_code
585                .err()
586                .filter(|error| !is_ignorable_create_topic_error(*error))
587            {
588                return Err(anyhow!("create topic '{name}' failed: {error}").into());
589            }
590        }
591
592        Ok(())
593    }
594
595    #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
596    /// Delete Topics.
597    pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
598    where
599        I: IntoIterator<Item = S>,
600        S: Into<String>,
601    {
602        let topic_names = topics
603            .into_iter()
604            .map(|topic| validate_topic_name(topic.into()))
605            .collect::<Result<Vec<_>>>()?;
606        if topic_names.is_empty() {
607            return Ok(());
608        }
609
610        let request = DeleteTopicsRequest::default()
611            .with_topic_names(
612                topic_names
613                    .iter()
614                    .cloned()
615                    .map(StrBytes::from_string)
616                    .map(Into::into)
617                    .collect(),
618            )
619            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
620        let response: DeleteTopicsResponse = self
621            .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
622            .await?;
623
624        for topic in response.responses {
625            let name = topic
626                .name
627                .as_ref()
628                .map(|name| name.0.to_string())
629                .unwrap_or_else(|| "<unknown>".to_owned());
630            if let Some(error) = topic.error_code.err() {
631                return Err(anyhow!("delete topic '{name}' failed: {error}").into());
632            }
633        }
634
635        Ok(())
636    }
637
638    #[instrument(
639        name = "admin.create_partitions",
640        level = "debug",
641        skip(self, partitions)
642    )]
643    /// Create Partitions.
644    pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
645    where
646        I: IntoIterator<Item = (S, NewPartitions)>,
647        S: Into<String>,
648    {
649        let topics = partitions
650            .into_iter()
651            .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
652            .collect::<Result<Vec<_>>>()?;
653        if topics.is_empty() {
654            return Ok(());
655        }
656
657        let request = CreatePartitionsRequest::default()
658            .with_topics(topics)
659            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
660            .with_validate_only(false);
661        let response: CreatePartitionsResponse = self
662            .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
663            .await?;
664
665        for topic in response.results {
666            let name = topic.name.0.to_string();
667            if let Some(error) = topic.error_code.err() {
668                return Err(anyhow!(
669                    "create partitions for topic '{name}' failed: {}",
670                    topic
671                        .error_message
672                        .as_ref()
673                        .map(|message| message.to_string())
674                        .filter(|message| !message.is_empty())
675                        .unwrap_or_else(|| error.to_string())
676                )
677                .into());
678            }
679        }
680
681        Ok(())
682    }
683
684    #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
685    /// List Topics.
686    pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
687        let response = self.fetch_metadata(None).await?;
688        let mut topics = Vec::new();
689
690        for topic in response.topics {
691            let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
692                continue;
693            };
694            if let Some(error) = topic.error_code.err() {
695                return Err(anyhow!("list topics failed for '{name}': {error}").into());
696            }
697
698            topics.push(TopicListing {
699                name,
700                topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
701                is_internal: topic.is_internal,
702            });
703        }
704
705        topics.sort_by(|left, right| left.name.cmp(&right.name));
706        Ok(topics)
707    }
708
709    #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
710    /// Describe Topics.
711    pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
712    where
713        I: IntoIterator<Item = S>,
714        S: Into<String>,
715    {
716        let requested_topics = topics
717            .into_iter()
718            .map(|topic| validate_topic_name(topic.into()))
719            .collect::<Result<Vec<_>>>()?;
720        if requested_topics.is_empty() {
721            return Ok(Vec::new());
722        }
723
724        let response = self.fetch_metadata(Some(&requested_topics)).await?;
725        let mut descriptions = BTreeMap::new();
726
727        for topic in response.topics {
728            let name = topic
729                .name
730                .as_ref()
731                .map(|name| name.0.to_string())
732                .unwrap_or_default();
733            if let Some(error) = topic.error_code.err() {
734                let label = if name.is_empty() { "<unknown>" } else { &name };
735                return Err(anyhow!("describe topic '{label}' failed: {error}").into());
736            }
737
738            let mut partitions = topic
739                .partitions
740                .into_iter()
741                .map(|partition| {
742                    if let Some(error) = partition.error_code.err() {
743                        return Err(anyhow!(
744                            "describe topic '{name}' partition {} failed: {error}",
745                            partition.partition_index
746                        ));
747                    }
748
749                    Ok(TopicPartitionDescription {
750                        partition: partition.partition_index,
751                        leader_id: partition.leader_id.0,
752                        leader_epoch: partition.leader_epoch,
753                        replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
754                        isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
755                        offline_replicas: partition
756                            .offline_replicas
757                            .into_iter()
758                            .map(|id| id.0)
759                            .collect(),
760                    })
761                })
762                .collect::<std::result::Result<Vec<_>, _>>()?;
763            partitions.sort_by_key(|partition| partition.partition);
764
765            descriptions.insert(
766                name.clone(),
767                TopicDescription {
768                    name,
769                    topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
770                    is_internal: topic.is_internal,
771                    partitions,
772                },
773            );
774        }
775
776        requested_topics
777            .into_iter()
778            .map(|topic| {
779                descriptions.remove(&topic).ok_or_else(|| {
780                    anyhow!("metadata response did not include topic '{topic}'").into()
781                })
782            })
783            .collect()
784    }
785
786    #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
787    /// Describe Cluster.
788    pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
789        let (mut connection, version) = self
790            .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
791            .await?;
792        let mut request =
793            DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
794        if version >= 1 {
795            request = request.with_endpoint_type(1);
796        }
797        if version >= 2 {
798            request = request.with_include_fenced_brokers(true);
799        }
800
801        let response: DescribeClusterResponse = connection
802            .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
803            .await?;
804        if let Some(error) = response.error_code.err() {
805            return Err(anyhow!("describe cluster failed: {error}").into());
806        }
807
808        let mut brokers = response
809            .brokers
810            .into_iter()
811            .map(|broker| BrokerDescription {
812                broker_id: broker.broker_id.0,
813                host: broker.host.to_string(),
814                port: broker.port,
815                rack: broker.rack.map(|rack| rack.to_string()),
816                is_fenced: broker.is_fenced,
817            })
818            .collect::<Vec<_>>();
819        brokers.sort_by_key(|broker| broker.broker_id);
820
821        Ok(ClusterDescription {
822            cluster_id: response.cluster_id.to_string(),
823            controller_id: response.controller_id.0,
824            brokers,
825        })
826    }
827
828    #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
829    /// List Consumer Groups.
830    pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
831        let response: ListGroupsResponse = self
832            .send_request::<ListGroupsRequest>(
833                LIST_GROUPS_VERSION_CAP,
834                &ListGroupsRequest::default(),
835            )
836            .await?;
837        if let Some(error) = response.error_code.err() {
838            return Err(anyhow!("list consumer groups failed: {error}").into());
839        }
840
841        let mut groups = response
842            .groups
843            .into_iter()
844            .map(|group| ConsumerGroupListing {
845                group_id: group.group_id.to_string(),
846                protocol_type: group.protocol_type.to_string(),
847                state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
848                group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
849            })
850            .collect::<Vec<_>>();
851        groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
852        Ok(groups)
853    }
854
855    #[instrument(
856        name = "admin.describe_consumer_groups",
857        level = "debug",
858        skip(self, groups)
859    )]
860    /// Describe Consumer Groups.
861    pub async fn describe_consumer_groups<I, S>(
862        &self,
863        groups: I,
864    ) -> Result<Vec<ConsumerGroupDescription>>
865    where
866        I: IntoIterator<Item = S>,
867        S: Into<String>,
868    {
869        let group_ids = groups
870            .into_iter()
871            .map(|group| validate_group_id(group.into()))
872            .collect::<Result<Vec<_>>>()?;
873        if group_ids.is_empty() {
874            return Ok(Vec::new());
875        }
876
877        let request = ConsumerGroupDescribeRequest::default()
878            .with_group_ids(
879                group_ids
880                    .iter()
881                    .cloned()
882                    .map(StrBytes::from_string)
883                    .map(Into::into)
884                    .collect(),
885            )
886            .with_include_authorized_operations(false);
887        let response: ConsumerGroupDescribeResponse = self
888            .send_request::<ConsumerGroupDescribeRequest>(
889                CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
890                &request,
891            )
892            .await?;
893
894        let mut descriptions = BTreeMap::new();
895        for group in response.groups {
896            let group_id = group.group_id.to_string();
897            if let Some(error) = group.error_code.err() {
898                let message = group
899                    .error_message
900                    .as_ref()
901                    .map(ToString::to_string)
902                    .filter(|message| !message.is_empty())
903                    .unwrap_or_else(|| error.to_string());
904                return Err(
905                    anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
906                );
907            }
908
909            descriptions.insert(
910                group_id.clone(),
911                ConsumerGroupDescription {
912                    group_id,
913                    state: group.group_state.to_string(),
914                    protocol_type: "consumer".to_owned(),
915                    protocol_data: group.assignor_name.to_string(),
916                    members: group
917                        .members
918                        .into_iter()
919                        .map(|member| ConsumerGroupMemberDescription {
920                            member_id: member.member_id.to_string(),
921                            group_instance_id: member
922                                .instance_id
923                                .map(|instance_id| instance_id.to_string()),
924                            client_id: member.client_id.to_string(),
925                            client_host: member.client_host.to_string(),
926                            member_metadata_bytes: member.subscribed_topic_names.len(),
927                            member_assignment_bytes: assignment_partition_count(&member.assignment),
928                        })
929                        .collect(),
930                    authorized_operations: (group.authorized_operations != i32::MIN)
931                        .then_some(group.authorized_operations),
932                },
933            );
934        }
935
936        group_ids
937            .into_iter()
938            .map(|group_id| {
939                descriptions.remove(&group_id).ok_or_else(|| {
940                    anyhow!("describe groups response did not include group '{group_id}'").into()
941                })
942            })
943            .collect()
944    }
945
946    #[instrument(
947        name = "admin.delete_consumer_groups",
948        level = "debug",
949        skip(self, groups)
950    )]
951    /// Delete Consumer Groups.
952    pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
953    where
954        I: IntoIterator<Item = S>,
955        S: Into<String>,
956    {
957        let group_ids = groups
958            .into_iter()
959            .map(|group| validate_group_id(group.into()))
960            .collect::<Result<Vec<_>>>()?;
961        if group_ids.is_empty() {
962            return Ok(());
963        }
964
965        let request = DeleteGroupsRequest::default().with_groups_names(
966            group_ids
967                .iter()
968                .cloned()
969                .map(StrBytes::from_string)
970                .map(Into::into)
971                .collect(),
972        );
973        let response: DeleteGroupsResponse = self
974            .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
975            .await?;
976
977        for result in response.results {
978            if let Some(error) = result.error_code.err() {
979                return Err(anyhow!(
980                    "delete consumer group '{}' failed: {error}",
981                    &*result.group_id
982                )
983                .into());
984            }
985        }
986        Ok(())
987    }
988
989    #[instrument(
990        name = "admin.describe_configs",
991        level = "debug",
992        skip(self, resources)
993    )]
994    /// Describe Configs.
995    pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
996    where
997        I: IntoIterator<Item = ConfigResource>,
998    {
999        let resources = resources.into_iter().collect::<Vec<_>>();
1000        if resources.is_empty() {
1001            return Ok(Vec::new());
1002        }
1003
1004        let request = DescribeConfigsRequest::default()
1005            .with_resources(
1006                resources
1007                    .iter()
1008                    .map(|resource| {
1009                        DescribeConfigsResource::default()
1010                            .with_resource_type(resource.resource_type.as_protocol_value())
1011                            .with_resource_name(StrBytes::from_string(
1012                                resource.resource_name.clone(),
1013                            ))
1014                            .with_configuration_keys(None)
1015                    })
1016                    .collect(),
1017            )
1018            .with_include_synonyms(false);
1019        let response: DescribeConfigsResponse = self
1020            .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
1021            .await?;
1022
1023        let mut described = BTreeMap::new();
1024        for resource in response.results {
1025            let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
1026            let resource_name = resource.resource_name.to_string();
1027            if let Some(error) = resource.error_code.err() {
1028                return Err(anyhow!(
1029                    "describe configs for {:?} '{}' failed: {}",
1030                    resource_type,
1031                    resource_name,
1032                    resource
1033                        .error_message
1034                        .as_ref()
1035                        .map(|message| message.to_string())
1036                        .filter(|message| !message.is_empty())
1037                        .unwrap_or_else(|| error.to_string())
1038                )
1039                .into());
1040            }
1041
1042            let entries = resource
1043                .configs
1044                .into_iter()
1045                .map(|entry| {
1046                    let name = entry.name.to_string();
1047                    let config_entry = ConfigEntry {
1048                        name: name.clone(),
1049                        value: entry.value.map(|value| value.to_string()),
1050                        read_only: entry.read_only,
1051                        config_source: entry.config_source,
1052                        is_sensitive: entry.is_sensitive,
1053                        config_type: (response_supported_config_type(entry.config_type))
1054                            .then_some(entry.config_type),
1055                        documentation: entry.documentation.map(|doc| doc.to_string()),
1056                    };
1057                    (name, config_entry)
1058                })
1059                .collect();
1060
1061            described.insert(
1062                (resource_type, resource_name.clone()),
1063                ConfigResourceConfig {
1064                    resource: ConfigResource::new(resource_type, resource_name),
1065                    entries,
1066                },
1067            );
1068        }
1069
1070        resources
1071            .into_iter()
1072            .map(|resource| {
1073                described
1074                    .remove(&(resource.resource_type, resource.resource_name.clone()))
1075                    .ok_or_else(|| {
1076                        anyhow!("describe configs response did not include {:?}", resource).into()
1077                    })
1078            })
1079            .collect()
1080    }
1081
1082    #[instrument(
1083        name = "admin.incremental_alter_configs",
1084        level = "debug",
1085        skip(self, resources)
1086    )]
1087    /// Incremental Alter Configs.
1088    pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
1089    where
1090        I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
1091    {
1092        let resources = resources
1093            .into_iter()
1094            .map(|(resource, ops)| {
1095                AlterConfigsResource::default()
1096                    .with_resource_type(resource.resource_type.as_protocol_value())
1097                    .with_resource_name(StrBytes::from_string(resource.resource_name))
1098                    .with_configs(
1099                        ops.into_iter()
1100                            .map(|op| {
1101                                AlterableConfig::default()
1102                                    .with_name(StrBytes::from_string(op.name))
1103                                    .with_config_operation(op.op_type.as_protocol_value())
1104                                    .with_value(op.value.map(StrBytes::from_string))
1105                            })
1106                            .collect(),
1107                    )
1108            })
1109            .collect::<Vec<_>>();
1110        if resources.is_empty() {
1111            return Ok(());
1112        }
1113
1114        let request = IncrementalAlterConfigsRequest::default()
1115            .with_resources(resources)
1116            .with_validate_only(false);
1117        let response: IncrementalAlterConfigsResponse = self
1118            .send_request::<IncrementalAlterConfigsRequest>(
1119                INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
1120                &request,
1121            )
1122            .await?;
1123
1124        for resource in response.responses {
1125            if let Some(error) = resource.error_code.err() {
1126                return Err(anyhow!(
1127                    "incremental alter configs for {:?} '{}' failed: {}",
1128                    ConfigResourceType::from_protocol_value(resource.resource_type),
1129                    resource.resource_name,
1130                    resource
1131                        .error_message
1132                        .as_ref()
1133                        .map(|message| message.to_string())
1134                        .filter(|message| !message.is_empty())
1135                        .unwrap_or_else(|| error.to_string())
1136                )
1137                .into());
1138            }
1139        }
1140
1141        Ok(())
1142    }
1143
1144    #[instrument(
1145        name = "admin.upsert_scram_credential",
1146        level = "debug",
1147        skip(self, user, password)
1148    )]
1149    /// Upsert Scram Credential.
1150    pub async fn upsert_scram_credential(
1151        &self,
1152        user: impl Into<String>,
1153        mechanism: SaslMechanism,
1154        password: impl AsRef<[u8]>,
1155    ) -> Result<()> {
1156        self.upsert_scram_credential_with_iterations(
1157            user,
1158            mechanism,
1159            password,
1160            scram::MIN_ITERATIONS,
1161        )
1162        .await
1163    }
1164
1165    #[instrument(
1166        name = "admin.upsert_scram_credential_with_iterations",
1167        level = "debug",
1168        skip(self, user, password)
1169    )]
1170    /// Upsert Scram Credential With Iterations.
1171    pub async fn upsert_scram_credential_with_iterations(
1172        &self,
1173        user: impl Into<String>,
1174        mechanism: SaslMechanism,
1175        password: impl AsRef<[u8]>,
1176        iterations: i32,
1177    ) -> Result<()> {
1178        let user = user.into();
1179        let mechanism_type = mechanism
1180            .scram_type()
1181            .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
1182        let salt = scram::secure_random_bytes()?;
1183        let salted_password =
1184            scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
1185        let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
1186            ScramCredentialUpsertion::default()
1187                .with_name(StrBytes::from_string(user.clone()))
1188                .with_mechanism(mechanism_type)
1189                .with_iterations(iterations)
1190                .with_salt(Bytes::from(salt))
1191                .with_salted_password(Bytes::from(salted_password)),
1192        ]);
1193        let response: AlterUserScramCredentialsResponse = self
1194            .send_request::<AlterUserScramCredentialsRequest>(
1195                ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
1196                &request,
1197            )
1198            .await?;
1199
1200        for result in response.results {
1201            if let Some(error) = result.error_code.err() {
1202                return Err(anyhow!(
1203                    "alter SCRAM credential for user '{}' failed: {}",
1204                    result.user,
1205                    result
1206                        .error_message
1207                        .as_ref()
1208                        .map(|message| message.to_string())
1209                        .filter(|message| !message.is_empty())
1210                        .unwrap_or_else(|| error.to_string())
1211                )
1212                .into());
1213            }
1214        }
1215
1216        Ok(())
1217    }
1218
1219    /// Config.
1220    pub fn config(&self) -> &AdminConfig {
1221        &self.config
1222    }
1223
1224    /// Finalized Feature Levels.
1225    pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
1226        let connection = connect_to_any_bootstrap(
1227            &self.config.bootstrap_servers,
1228            &self.config.client_id,
1229            self.config.request_timeout,
1230            self.config.security_protocol,
1231            &self.config.tls,
1232            &self.config.sasl,
1233            &self.config.tcp_connector,
1234        )
1235        .await?;
1236        Ok(connection
1237            .finalized_feature_levels()
1238            .into_iter()
1239            .map(|(name, level)| BrokerFeatureLevel { name, level })
1240            .collect())
1241    }
1242
1243    /// Updates finalized broker feature levels.
1244    pub async fn update_features<I>(&self, updates: I) -> Result<()>
1245    where
1246        I: IntoIterator<Item = FeatureUpdate>,
1247    {
1248        self.update_features_inner(updates, false).await
1249    }
1250
1251    /// Validates finalized broker feature updates without applying them.
1252    pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
1253    where
1254        I: IntoIterator<Item = FeatureUpdate>,
1255    {
1256        self.update_features_inner(updates, true).await
1257    }
1258
1259    async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
1260    where
1261        I: IntoIterator<Item = FeatureUpdate>,
1262    {
1263        let (mut connection, version) = self
1264            .connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
1265            .await?;
1266        let feature_updates = updates
1267            .into_iter()
1268            .map(|update| update.into_request_update(version))
1269            .collect::<Result<Vec<_>>>()?;
1270        if feature_updates.is_empty() {
1271            return Ok(());
1272        }
1273
1274        let mut request = UpdateFeaturesRequest::default()
1275            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1276            .with_feature_updates(feature_updates);
1277        if version >= 1 {
1278            request = request.with_validate_only(validate_only);
1279        } else if validate_only {
1280            return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
1281        }
1282
1283        let response: UpdateFeaturesResponse = connection
1284            .send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
1285            .await?;
1286        if let Some(error) = response.error_code.err() {
1287            return Err(anyhow!(
1288                "update features failed: {}",
1289                response
1290                    .error_message
1291                    .as_ref()
1292                    .map(|message| message.to_string())
1293                    .filter(|message| !message.is_empty())
1294                    .unwrap_or_else(|| error.to_string())
1295            )
1296            .into());
1297        }
1298
1299        for result in response.results {
1300            if let Some(error) = result.error_code.err() {
1301                let feature = result.feature.to_string();
1302                return Err(anyhow!(
1303                    "update feature '{feature}' failed: {}",
1304                    result
1305                        .error_message
1306                        .as_ref()
1307                        .map(|message| message.to_string())
1308                        .filter(|message| !message.is_empty())
1309                        .unwrap_or_else(|| error.to_string())
1310                )
1311                .into());
1312            }
1313        }
1314
1315        Ok(())
1316    }
1317
1318    async fn warm_up(&self) -> Result<()> {
1319        let _ = connect_to_any_bootstrap(
1320            &self.config.bootstrap_servers,
1321            &self.config.client_id,
1322            self.config.request_timeout,
1323            self.config.security_protocol,
1324            &self.config.tls,
1325            &self.config.sasl,
1326            &self.config.tcp_connector,
1327        )
1328        .await?;
1329        Ok(())
1330    }
1331
1332    async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
1333        let (mut connection, version) = self
1334            .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
1335            .await?;
1336        let request = MetadataRequest::default()
1337            .with_topics(topics.map(|topics| {
1338                topics
1339                    .iter()
1340                    .cloned()
1341                    .map(StrBytes::from_string)
1342                    .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
1343                    .collect()
1344            }))
1345            .with_allow_auto_topic_creation(false)
1346            .with_include_cluster_authorized_operations(false)
1347            .with_include_topic_authorized_operations(false);
1348        Ok(connection
1349            .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
1350            .await?)
1351    }
1352
1353    async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
1354    where
1355        Req: Request,
1356    {
1357        let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
1358        Ok(connection
1359            .send_request::<Req>(&self.config.client_id, version, request)
1360            .await?)
1361    }
1362
1363    async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
1364    where
1365        Req: Request,
1366    {
1367        let connection = connect_to_any_bootstrap(
1368            &self.config.bootstrap_servers,
1369            &self.config.client_id,
1370            self.config.request_timeout,
1371            self.config.security_protocol,
1372            &self.config.tls,
1373            &self.config.sasl,
1374            &self.config.tcp_connector,
1375        )
1376        .await?;
1377        let version = connection.version_with_cap::<Req>(version_cap)?;
1378        Ok((connection, version))
1379    }
1380}
1381
1382fn validate_topic_name(topic: String) -> Result<String> {
1383    let topic = topic.trim();
1384    if topic.is_empty() {
1385        return Err(AdminError::EmptyTopicName.into());
1386    }
1387
1388    Ok(topic.to_owned())
1389}
1390
1391fn validate_group_id(group_id: String) -> Result<String> {
1392    let group_id = group_id.trim();
1393    if group_id.is_empty() {
1394        return Err(anyhow!("consumer group id cannot be empty").into());
1395    }
1396
1397    Ok(group_id.to_owned())
1398}
1399
1400fn validate_feature_name(feature: String) -> Result<String> {
1401    let feature = feature.trim();
1402    if feature.is_empty() {
1403        return Err(anyhow!("feature names must be non-empty").into());
1404    }
1405
1406    Ok(feature.to_owned())
1407}
1408
1409fn assignment_partition_count(
1410    assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
1411) -> usize {
1412    assignment
1413        .topic_partitions
1414        .iter()
1415        .map(|topic| topic.partitions.len())
1416        .sum()
1417}
1418
1419fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
1420    error == ResponseError::TopicAlreadyExists
1421}
1422
1423fn response_supported_config_type(config_type: i8) -> bool {
1424    config_type >= 0
1425}
1426
1427#[cfg(test)]
1428mod tests {
1429    use super::*;
1430
1431    #[test]
1432    fn new_topic_maps_to_create_topics_request() {
1433        let topic = NewTopic::new("orders", 3, 2)
1434            .with_config("cleanup.policy", "compact")
1435            .into_request_topic()
1436            .expect("topic should be valid");
1437
1438        assert_eq!(topic.name.0.to_string(), "orders");
1439        assert_eq!(topic.num_partitions, 3);
1440        assert_eq!(topic.replication_factor, 2);
1441        assert_eq!(topic.configs.len(), 1);
1442        assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
1443    }
1444
1445    #[test]
1446    fn new_topic_rejects_invalid_partition_count() {
1447        let error = NewTopic::new("orders", 0, 1)
1448            .into_request_topic()
1449            .expect_err("invalid topic should fail");
1450        assert!(
1451            error
1452                .to_string()
1453                .contains("topic partition count must be positive")
1454        );
1455    }
1456
1457    #[test]
1458    fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
1459        let error = NewTopic::new("  ", 1, 1).into_request_topic().unwrap_err();
1460        assert!(matches!(
1461            error,
1462            crate::Error::Admin(AdminError::EmptyTopicName)
1463        ));
1464
1465        let error = NewTopic::new("orders", 1, 0)
1466            .into_request_topic()
1467            .unwrap_err();
1468        assert!(matches!(
1469            error,
1470            crate::Error::Admin(AdminError::InvalidReplicationFactor {
1471                replication_factor: 0
1472            })
1473        ));
1474    }
1475
1476    #[test]
1477    fn new_partitions_maps_assignments_and_rejects_invalid_input() {
1478        let topic = NewPartitions::increase_to(4)
1479            .with_assignment([1, 2])
1480            .with_assignment([2, 3])
1481            .into_request_topic("orders".to_owned())
1482            .unwrap();
1483        assert_eq!(topic.name.to_string(), "orders");
1484        assert_eq!(topic.count, 4);
1485        assert_eq!(topic.assignments.unwrap().len(), 2);
1486
1487        let error = NewPartitions::increase_to(0)
1488            .into_request_topic("orders".to_owned())
1489            .unwrap_err();
1490        assert!(matches!(
1491            error,
1492            crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
1493        ));
1494
1495        let error = NewPartitions::increase_to(1)
1496            .into_request_topic(" ".to_owned())
1497            .unwrap_err();
1498        assert!(matches!(
1499            error,
1500            crate::Error::Admin(AdminError::EmptyTopicName)
1501        ));
1502    }
1503
1504    #[test]
1505    fn config_resource_and_operation_protocol_values_are_stable() {
1506        assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
1507        assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
1508        assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
1509        assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
1510        assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
1511        assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
1512        assert_eq!(
1513            ConfigResourceType::from_protocol_value(2),
1514            ConfigResourceType::Topic
1515        );
1516        assert_eq!(
1517            ConfigResourceType::from_protocol_value(99),
1518            ConfigResourceType::Unknown
1519        );
1520
1521        assert_eq!(
1522            AlterConfigOp::set("cleanup.policy", "compact")
1523                .op_type
1524                .as_protocol_value(),
1525            0
1526        );
1527        assert_eq!(
1528            AlterConfigOp::delete("retention.ms")
1529                .op_type
1530                .as_protocol_value(),
1531            1
1532        );
1533        assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
1534        assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
1535        assert!(response_supported_config_type(0));
1536        assert!(!response_supported_config_type(-1));
1537    }
1538
1539    #[test]
1540    fn config_resource_config_entry_lookup_returns_named_entry() {
1541        let mut entries = BTreeMap::new();
1542        entries.insert(
1543            "cleanup.policy".to_owned(),
1544            ConfigEntry {
1545                name: "cleanup.policy".to_owned(),
1546                value: Some("compact".to_owned()),
1547                read_only: false,
1548                config_source: 1,
1549                is_sensitive: false,
1550                config_type: Some(2),
1551                documentation: None,
1552            },
1553        );
1554        let config = ConfigResourceConfig {
1555            resource: ConfigResource::topic("orders"),
1556            entries,
1557        };
1558
1559        assert_eq!(
1560            config.entry("cleanup.policy").unwrap().value.as_deref(),
1561            Some("compact")
1562        );
1563        assert!(config.entry("missing").is_none());
1564    }
1565
1566    #[test]
1567    fn feature_update_maps_to_modern_update_features_request() {
1568        let update = FeatureUpdate::upgrade("share.version", 1)
1569            .into_request_update(2)
1570            .unwrap();
1571        assert_eq!(update.feature.to_string(), "share.version");
1572        assert_eq!(update.max_version_level, 1);
1573        assert_eq!(update.upgrade_type, 1);
1574        assert!(!update.allow_downgrade);
1575    }
1576
1577    #[test]
1578    fn feature_update_maps_to_legacy_downgrade_flag() {
1579        let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
1580            .into_request_update(0)
1581            .unwrap();
1582        assert_eq!(update.feature.to_string(), "metadata.version");
1583        assert_eq!(update.max_version_level, 0);
1584        assert_eq!(update.upgrade_type, 1);
1585        assert!(update.allow_downgrade);
1586    }
1587
1588    #[test]
1589    fn feature_update_rejects_empty_name() {
1590        let error = FeatureUpdate::upgrade(" ", 1)
1591            .into_request_update(2)
1592            .unwrap_err();
1593        assert!(
1594            error
1595                .to_string()
1596                .contains("feature names must be non-empty")
1597        );
1598    }
1599
1600    #[test]
1601    fn topic_already_exists_is_ignorable() {
1602        assert!(is_ignorable_create_topic_error(
1603            ResponseError::TopicAlreadyExists
1604        ));
1605        assert!(!is_ignorable_create_topic_error(
1606            ResponseError::UnknownTopicOrPartition
1607        ));
1608    }
1609}