snakeice_rdkafka/
admin.rs

1//! Admin client.
2//!
3//! The main object is the [`AdminClient`] struct.
4//!
5//! [`AdminClient`]: struct.AdminClient.html
6
7use std::collections::HashMap;
8use std::ffi::{c_void, CStr, CString};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::{self, JoinHandle};
15use std::time::Duration;
16
17use futures_channel::oneshot;
18use futures_util::future::{self, Either, FutureExt};
19use futures_util::ready;
20
21use rdkafka_sys as rdsys;
22use rdkafka_sys::types::*;
23
24use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
25use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
26use crate::error::{IsError, KafkaError, KafkaResult};
27use crate::log::{trace, warn};
28use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29use crate::TopicPartitionList;
30
31//
32// ********** ADMIN CLIENT **********
33//
34
35/// A client for the Kafka admin API.
36///
37/// `AdminClient` provides programmatic access to managing a Kafka cluster,
38/// notably manipulating topics, partitions, and configuration paramaters.
39pub struct AdminClient<C: ClientContext> {
40    client: Client<C>,
41    queue: Arc<NativeQueue>,
42    should_stop: Arc<AtomicBool>,
43    handle: Option<JoinHandle<()>>,
44}
45
46impl<C: ClientContext> AdminClient<C> {
47    /// Creates new topics according to the provided `NewTopic` specifications.
48    ///
49    /// Note that while the API supports creating multiple topics at once, it
50    /// is not transactional. Creation of some topics may succeed while others
51    /// fail. Be sure to check the result of each individual operation.
52    pub fn create_topics<'a, I>(
53        &self,
54        topics: I,
55        opts: &AdminOptions,
56    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
57    where
58        I: IntoIterator<Item = &'a NewTopic<'a>>,
59    {
60        match self.create_topics_inner(topics, opts) {
61            Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
62            Err(err) => Either::Right(future::err(err)),
63        }
64    }
65
66    fn create_topics_inner<'a, I>(
67        &self,
68        topics: I,
69        opts: &AdminOptions,
70    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
71    where
72        I: IntoIterator<Item = &'a NewTopic<'a>>,
73    {
74        let mut native_topics = Vec::new();
75        let mut err_buf = ErrBuf::new();
76        for t in topics {
77            native_topics.push(t.to_native(&mut err_buf)?);
78        }
79        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
80        unsafe {
81            rdsys::rd_kafka_CreateTopics(
82                self.client.native_ptr(),
83                native_topics.as_c_array(),
84                native_topics.len(),
85                native_opts.ptr(),
86                self.queue.ptr(),
87            );
88        }
89        Ok(rx)
90    }
91
92    /// Deletes the named topics.
93    ///
94    /// Note that while the API supports deleting multiple topics at once, it is
95    /// not transactional. Deletion of some topics may succeed while others
96    /// fail. Be sure to check the result of each individual operation.
97    pub fn delete_topics(
98        &self,
99        topic_names: &[&str],
100        opts: &AdminOptions,
101    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
102        match self.delete_topics_inner(topic_names, opts) {
103            Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
104            Err(err) => Either::Right(future::err(err)),
105        }
106    }
107
108    fn delete_topics_inner(
109        &self,
110        topic_names: &[&str],
111        opts: &AdminOptions,
112    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
113        let mut native_topics = Vec::new();
114        let mut err_buf = ErrBuf::new();
115        for tn in topic_names {
116            let tn_c = CString::new(*tn)?;
117            let native_topic = unsafe {
118                NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
119            };
120            native_topics.push(native_topic);
121        }
122        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
123        unsafe {
124            rdsys::rd_kafka_DeleteTopics(
125                self.client.native_ptr(),
126                native_topics.as_c_array(),
127                native_topics.len(),
128                native_opts.ptr(),
129                self.queue.ptr(),
130            );
131        }
132        Ok(rx)
133    }
134
135    /// Deletes the named groups.
136    pub fn delete_groups(
137        &self,
138        group_names: &[&str],
139        opts: &AdminOptions,
140    ) -> impl Future<Output = KafkaResult<Vec<GroupResult>>> {
141        match self.delete_groups_inner(group_names, opts) {
142            Ok(rx) => Either::Left(DeleteGroupsFuture { rx }),
143            Err(err) => Either::Right(future::err(err)),
144        }
145    }
146
147    fn delete_groups_inner(
148        &self,
149        group_names: &[&str],
150        opts: &AdminOptions,
151    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
152        let mut native_groups = Vec::new();
153        let mut err_buf = ErrBuf::new();
154        for gn in group_names {
155            let gn_t = CString::new(*gn)?;
156            let native_group = unsafe {
157                NativeDeleteGroup::from_ptr(rdsys::rd_kafka_DeleteGroup_new(gn_t.as_ptr())).unwrap()
158            };
159            native_groups.push(native_group);
160        }
161        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
162
163        unsafe {
164            rdsys::rd_kafka_DeleteGroups(
165                self.client.native_ptr(),
166                native_groups.as_c_array(),
167                native_groups.len(),
168                native_opts.ptr(),
169                self.queue.ptr(),
170            )
171        }
172        Ok(rx)
173    }
174
175    /// Adds additional partitions to existing topics according to the provided
176    /// `NewPartitions` specifications.
177    ///
178    /// Note that while the API supports creating partitions for multiple topics
179    /// at once, it is not transactional. Creation of partitions for some topics
180    /// may succeed while others fail. Be sure to check the result of each
181    /// individual operation.
182    pub fn create_partitions<'a, I>(
183        &self,
184        partitions: I,
185        opts: &AdminOptions,
186    ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
187    where
188        I: IntoIterator<Item = &'a NewPartitions<'a>>,
189    {
190        match self.create_partitions_inner(partitions, opts) {
191            Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
192            Err(err) => Either::Right(future::err(err)),
193        }
194    }
195
196    fn create_partitions_inner<'a, I>(
197        &self,
198        partitions: I,
199        opts: &AdminOptions,
200    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
201    where
202        I: IntoIterator<Item = &'a NewPartitions<'a>>,
203    {
204        let mut native_partitions = Vec::new();
205        let mut err_buf = ErrBuf::new();
206        for p in partitions {
207            native_partitions.push(p.to_native(&mut err_buf)?);
208        }
209        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
210        unsafe {
211            rdsys::rd_kafka_CreatePartitions(
212                self.client.native_ptr(),
213                native_partitions.as_c_array(),
214                native_partitions.len(),
215                native_opts.ptr(),
216                self.queue.ptr(),
217            );
218        }
219        Ok(rx)
220    }
221
222    /// Deletes records from a topic.
223    ///
224    /// The provided `offsets` is a topic partition list specifying which
225    /// records to delete from a list of topic partitions. For each entry in the
226    /// list, the messages at offsets before the specified offsets (exclusive)
227    /// in the specified partition will be deleted. Use offset [`Offset::End`]
228    /// to delete all records in the partition.
229    ///
230    /// Returns a topic partition list describing the result of the deletion. If
231    /// the operation succeeded for a partition, the offset for that partition
232    /// will be set to the post-deletion low-water mark for that partition. If
233    /// the operation failed for a partition, there will be an error for that
234    /// partition's entry in the list.
235    pub fn delete_records(
236        &self,
237        offsets: &TopicPartitionList,
238        opts: &AdminOptions,
239    ) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
240        match self.delete_records_inner(offsets, opts) {
241            Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
242            Err(err) => Either::Right(future::err(err)),
243        }
244    }
245
246    fn delete_records_inner(
247        &self,
248        offsets: &TopicPartitionList,
249        opts: &AdminOptions,
250    ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
251        let mut err_buf = ErrBuf::new();
252        let delete_records = unsafe {
253            NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
254        }
255        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
256        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
257        unsafe {
258            rdsys::rd_kafka_DeleteRecords(
259                self.client.native_ptr(),
260                &mut delete_records.ptr(),
261                1,
262                native_opts.ptr(),
263                self.queue.ptr(),
264            );
265        }
266        Ok(rx)
267    }
268
269    /// Retrieves the configuration parameters for the specified resources.
270    ///
271    /// Note that while the API supports describing multiple configurations at
272    /// once, it is not transactional. There is no guarantee that you will see
273    /// a consistent snapshot of the configuration across different resources.
274    pub fn describe_configs<'a, I>(
275        &self,
276        configs: I,
277        opts: &AdminOptions,
278    ) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
279    where
280        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
281    {
282        match self.describe_configs_inner(configs, opts) {
283            Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
284            Err(err) => Either::Right(future::err(err)),
285        }
286    }
287
288    fn describe_configs_inner<'a, I>(
289        &self,
290        configs: I,
291        opts: &AdminOptions,
292    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
293    where
294        I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
295    {
296        let mut native_configs = Vec::new();
297        let mut err_buf = ErrBuf::new();
298        for c in configs {
299            let (name, typ) = match c {
300                ResourceSpecifier::Topic(name) => (
301                    CString::new(*name)?,
302                    RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
303                ),
304                ResourceSpecifier::Group(name) => (
305                    CString::new(*name)?,
306                    RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
307                ),
308                ResourceSpecifier::Broker(id) => (
309                    CString::new(format!("{}", id))?,
310                    RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
311                ),
312            };
313            native_configs.push(unsafe {
314                NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
315                    typ,
316                    name.as_ptr(),
317                ))
318                .unwrap()
319            });
320        }
321        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
322        unsafe {
323            rdsys::rd_kafka_DescribeConfigs(
324                self.client.native_ptr(),
325                native_configs.as_c_array(),
326                native_configs.len(),
327                native_opts.ptr(),
328                self.queue.ptr(),
329            );
330        }
331        Ok(rx)
332    }
333
334    /// Sets configuration parameters for the specified resources,
335    /// resetting unspecified parameters to their default values.
336    ///
337    /// Note that while the API supports altering multiple resources at once, it
338    /// is not transactional. Alteration of some resources may succeed while
339    /// others fail. Be sure to check the result of each individual operation.
340    pub fn alter_configs<'a, I>(
341        &self,
342        configs: I,
343        opts: &AdminOptions,
344    ) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
345    where
346        I: IntoIterator<Item = &'a AlterConfig<'a>>,
347    {
348        match self.alter_configs_inner(configs, opts) {
349            Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
350            Err(err) => Either::Right(future::err(err)),
351        }
352    }
353
354    fn alter_configs_inner<'a, I>(
355        &self,
356        configs: I,
357        opts: &AdminOptions,
358    ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
359    where
360        I: IntoIterator<Item = &'a AlterConfig<'a>>,
361    {
362        let mut native_configs = Vec::new();
363        let mut err_buf = ErrBuf::new();
364        for c in configs {
365            native_configs.push(c.to_native(&mut err_buf)?);
366        }
367        let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
368        unsafe {
369            rdsys::rd_kafka_AlterConfigs(
370                self.client.native_ptr(),
371                native_configs.as_c_array(),
372                native_configs.len(),
373                native_opts.ptr(),
374                self.queue.ptr(),
375            );
376        }
377        Ok(rx)
378    }
379
380    /// Returns the client underlying this admin client.
381    pub fn inner(&self) -> &Client<C> {
382        &self.client
383    }
384}
385
386impl FromClientConfig for AdminClient<DefaultClientContext> {
387    fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
388        AdminClient::from_config_and_context(config, DefaultClientContext)
389    }
390}
391
392impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
393    fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
394        let native_config = config.create_native_config()?;
395        // librdkafka only provides consumer and producer types. We follow the
396        // example of the Python bindings in choosing to pretend to be a
397        // producer, as producer clients are allegedly more lightweight. [0]
398        //
399        // [0]: https://github.com/confluentinc/confluent-kafka-python/blob/bfb07dfbca47c256c840aaace83d3fe26c587360/confluent_kafka/src/Admin.c#L1492-L1493
400        let client = Client::new(
401            config,
402            native_config,
403            RDKafkaType::RD_KAFKA_PRODUCER,
404            context,
405        )?;
406        let queue = Arc::new(client.new_native_queue());
407        let should_stop = Arc::new(AtomicBool::new(false));
408        let handle = start_poll_thread(queue.clone(), should_stop.clone());
409        Ok(AdminClient {
410            client,
411            queue,
412            should_stop,
413            handle: Some(handle),
414        })
415    }
416}
417
418impl<C: ClientContext> Drop for AdminClient<C> {
419    fn drop(&mut self) {
420        trace!("Stopping polling");
421        self.should_stop.store(true, Ordering::Relaxed);
422        trace!("Waiting for polling thread termination");
423        match self.handle.take().unwrap().join() {
424            Ok(()) => trace!("Polling stopped"),
425            Err(e) => warn!("Failure while terminating thread: {:?}", e),
426        };
427    }
428}
429
430fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
431    thread::Builder::new()
432        .name("admin client polling thread".into())
433        .spawn(move || {
434            trace!("Admin polling thread loop started");
435            loop {
436                let event = queue.poll(Duration::from_millis(100));
437                if event.is_null() {
438                    if should_stop.load(Ordering::Relaxed) {
439                        // We received nothing and the thread should stop, so
440                        // break the loop.
441                        break;
442                    }
443                    continue;
444                }
445                let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
446                let tx: Box<oneshot::Sender<NativeEvent>> =
447                    unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
448                let _ = tx.send(event);
449            }
450            trace!("Admin polling thread loop terminated");
451        })
452        .expect("Failed to start polling thread")
453}
454
455pub(crate) type NativeEvent = NativePtr<RDKafkaEvent>;
456
457unsafe impl KafkaDrop for RDKafkaEvent {
458    const TYPE: &'static str = "event";
459    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
460}
461
462unsafe impl Send for NativeEvent {}
463unsafe impl Sync for NativeEvent {}
464
465impl NativePtr<RDKafkaEvent> {
466    fn check_error(&self) -> KafkaResult<()> {
467        let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
468        if err.is_error() {
469            Err(KafkaError::AdminOp(err.into()))
470        } else {
471            Ok(())
472        }
473    }
474}
475
476//
477// ********** ADMIN OPTIONS **********
478//
479
480/// Options for an admin API request.
481#[derive(Default)]
482pub struct AdminOptions {
483    request_timeout: Option<Timeout>,
484    operation_timeout: Option<Timeout>,
485    validate_only: bool,
486    broker_id: Option<i32>,
487}
488
489impl AdminOptions {
490    /// Creates a new `AdminOptions`.
491    pub fn new() -> AdminOptions {
492        AdminOptions::default()
493    }
494
495    /// Sets the overall request timeout, including broker lookup, request
496    /// transmission, operation time on broker, and response.
497    ///
498    /// Defaults to the `socket.timeout.ms` configuration parameter.
499    pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
500        self.request_timeout = timeout.map(Into::into);
501        self
502    }
503
504    /// Sets the broker's operation timeout, such as the timeout for
505    /// CreateTopics to complete the creation of topics on the controller before
506    /// returning a result to the application.
507    ///
508    /// If unset (the default), the API calls will return immediately after
509    /// triggering the operation.
510    ///
511    /// Only the CreateTopics, DeleteTopics, and CreatePartitions API calls
512    /// respect this option.
513    pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
514        self.operation_timeout = timeout.map(Into::into);
515        self
516    }
517
518    /// Tells the broker to only validate the request, without performing the
519    /// requested operation.
520    ///
521    /// Defaults to false.
522    pub fn validate_only(mut self, validate_only: bool) -> Self {
523        self.validate_only = validate_only;
524        self
525    }
526
527    /// Override what broker the admin request will be sent to.
528    ///
529    /// By default, a reasonable broker will be selected automatically. See the
530    /// librdkafka docs on `rd_kafka_AdminOptions_set_broker` for details.
531    pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
532        self.broker_id = broker_id.into();
533        self
534    }
535
536    fn to_native(
537        &self,
538        client: *mut RDKafka,
539        err_buf: &mut ErrBuf,
540    ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
541        let native_opts = unsafe {
542            NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
543                client,
544                RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
545            ))
546            .unwrap()
547        };
548
549        if let Some(timeout) = self.request_timeout {
550            let res = unsafe {
551                rdsys::rd_kafka_AdminOptions_set_request_timeout(
552                    native_opts.ptr(),
553                    timeout.as_millis(),
554                    err_buf.as_mut_ptr(),
555                    err_buf.capacity(),
556                )
557            };
558            check_rdkafka_invalid_arg(res, err_buf)?;
559        }
560
561        if let Some(timeout) = self.operation_timeout {
562            let res = unsafe {
563                rdsys::rd_kafka_AdminOptions_set_operation_timeout(
564                    native_opts.ptr(),
565                    timeout.as_millis(),
566                    err_buf.as_mut_ptr(),
567                    err_buf.capacity(),
568                )
569            };
570            check_rdkafka_invalid_arg(res, err_buf)?;
571        }
572
573        if self.validate_only {
574            let res = unsafe {
575                rdsys::rd_kafka_AdminOptions_set_validate_only(
576                    native_opts.ptr(),
577                    1, // true
578                    err_buf.as_mut_ptr(),
579                    err_buf.capacity(),
580                )
581            };
582            check_rdkafka_invalid_arg(res, err_buf)?;
583        }
584
585        if let Some(broker_id) = self.broker_id {
586            let res = unsafe {
587                rdsys::rd_kafka_AdminOptions_set_broker(
588                    native_opts.ptr(),
589                    broker_id,
590                    err_buf.as_mut_ptr(),
591                    err_buf.capacity(),
592                )
593            };
594            check_rdkafka_invalid_arg(res, err_buf)?;
595        }
596
597        let (tx, rx) = oneshot::channel();
598        let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
599        unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
600
601        Ok((native_opts, rx))
602    }
603}
604
605unsafe impl KafkaDrop for RDKafkaAdminOptions {
606    const TYPE: &'static str = "admin options";
607    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
608}
609
610type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
611
612fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
613    match res.into() {
614        RDKafkaErrorCode::NoError => Ok(()),
615        RDKafkaErrorCode::InvalidArgument => {
616            let msg = if err_buf.len() == 0 {
617                "invalid argument".into()
618            } else {
619                err_buf.to_string()
620            };
621            Err(KafkaError::AdminOpCreation(msg))
622        }
623        res => Err(KafkaError::AdminOpCreation(format!(
624            "setting admin options returned unexpected error code {}",
625            res
626        ))),
627    }
628}
629
630//
631// ********** RESPONSE HANDLING **********
632//
633
634/// The result of an individual CreateTopic, DeleteTopic, or
635/// CreatePartition operation.
636pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
637
638fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
639    let mut out = Vec::with_capacity(n);
640    for i in 0..n {
641        let topic = unsafe { *topics.add(i) };
642        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
643        let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
644        if err.is_error() {
645            out.push(Err((name, err.into())));
646        } else {
647            out.push(Ok(name));
648        }
649    }
650    out
651}
652
653/// The result of a DeleteGroup operation.
654pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;
655
656fn build_group_results(groups: *const *const RDKafkaGroupResult, n: usize) -> Vec<GroupResult> {
657    let mut out = Vec::with_capacity(n);
658    for i in 0..n {
659        let group = unsafe { *groups.add(i) };
660        let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) };
661        let err = unsafe {
662            let err = rdsys::rd_kafka_group_result_error(group);
663            rdsys::rd_kafka_error_code(err)
664        };
665        if err.is_error() {
666            out.push(Err((name, err.into())));
667        } else {
668            out.push(Ok(name));
669        }
670    }
671    out
672}
673
674//
675// Create topic handling
676//
677
678/// Configuration for a CreateTopic operation.
679#[derive(Debug)]
680pub struct NewTopic<'a> {
681    /// The name of the new topic.
682    pub name: &'a str,
683    /// The initial number of partitions.
684    pub num_partitions: i32,
685    /// The initial replication configuration.
686    pub replication: TopicReplication<'a>,
687    /// The initial configuration parameters for the topic.
688    pub config: Vec<(&'a str, &'a str)>,
689}
690
691impl<'a> NewTopic<'a> {
692    /// Creates a new `NewTopic`.
693    pub fn new(
694        name: &'a str,
695        num_partitions: i32,
696        replication: TopicReplication<'a>,
697    ) -> NewTopic<'a> {
698        NewTopic {
699            name,
700            num_partitions,
701            replication,
702            config: Vec::new(),
703        }
704    }
705
706    /// Sets a new parameter in the initial topic configuration.
707    pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
708        self.config.push((key, value));
709        self
710    }
711
712    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
713        let name = CString::new(self.name)?;
714        let repl = match self.replication {
715            TopicReplication::Fixed(n) => n,
716            TopicReplication::Variable(partitions) => {
717                if partitions.len() as i32 != self.num_partitions {
718                    return Err(KafkaError::AdminOpCreation(format!(
719                        "replication configuration for topic '{}' assigns {} partition(s), \
720                         which does not match the specified number of partitions ({})",
721                        self.name,
722                        partitions.len(),
723                        self.num_partitions,
724                    )));
725                }
726                -1
727            }
728        };
729        // N.B.: we wrap topic immediately, so that it is destroyed via the
730        // NativeNewTopic's Drop implementation if replica assignment or config
731        // installation fails.
732        let topic = unsafe {
733            NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
734                name.as_ptr(),
735                self.num_partitions,
736                repl,
737                err_buf.as_mut_ptr(),
738                err_buf.capacity(),
739            ))
740        }
741        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
742
743        if let TopicReplication::Variable(assignment) = self.replication {
744            for (partition_id, broker_ids) in assignment.iter().enumerate() {
745                let res = unsafe {
746                    rdsys::rd_kafka_NewTopic_set_replica_assignment(
747                        topic.ptr(),
748                        partition_id as i32,
749                        broker_ids.as_ptr() as *mut i32,
750                        broker_ids.len(),
751                        err_buf.as_mut_ptr(),
752                        err_buf.capacity(),
753                    )
754                };
755                check_rdkafka_invalid_arg(res, err_buf)?;
756            }
757        }
758        for (key, val) in &self.config {
759            let key_c = CString::new(*key)?;
760            let val_c = CString::new(*val)?;
761            let res = unsafe {
762                rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
763            };
764            check_rdkafka_invalid_arg(res, err_buf)?;
765        }
766        Ok(topic)
767    }
768}
769
770/// An assignment of partitions to replicas.
771///
772/// Each element in the outer slice corresponds to the partition with that
773/// index. The inner slice specifies the broker IDs to which replicas of that
774/// partition should be assigned.
775pub type PartitionAssignment<'a> = &'a [&'a [i32]];
776
777/// Replication configuration for a new topic.
778#[derive(Debug)]
779pub enum TopicReplication<'a> {
780    /// All partitions should use the same fixed replication factor.
781    Fixed(i32),
782    /// Each partition should use the replica assignment from
783    /// `PartitionAssignment`.
784    Variable(PartitionAssignment<'a>),
785}
786
787type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
788
789unsafe impl KafkaDrop for RDKafkaNewTopic {
790    const TYPE: &'static str = "new topic";
791    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
792}
793
794struct CreateTopicsFuture {
795    rx: oneshot::Receiver<NativeEvent>,
796}
797
798impl Future for CreateTopicsFuture {
799    type Output = KafkaResult<Vec<TopicResult>>;
800
801    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
802        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
803        event.check_error()?;
804        let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
805        if res.is_null() {
806            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
807            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
808                "create topics request received response of incorrect type ({})",
809                typ
810            ))));
811        }
812        let mut n = 0;
813        let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
814        Poll::Ready(Ok(build_topic_results(topics, n)))
815    }
816}
817
818//
819// Delete topic handling
820//
821
822type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
823
824unsafe impl KafkaDrop for RDKafkaDeleteTopic {
825    const TYPE: &'static str = "delete topic";
826    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
827}
828
829struct DeleteTopicsFuture {
830    rx: oneshot::Receiver<NativeEvent>,
831}
832
833impl Future for DeleteTopicsFuture {
834    type Output = KafkaResult<Vec<TopicResult>>;
835
836    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
837        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
838        event.check_error()?;
839        let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
840        if res.is_null() {
841            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
842            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
843                "delete topics request received response of incorrect type ({})",
844                typ
845            ))));
846        }
847        let mut n = 0;
848        let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
849        Poll::Ready(Ok(build_topic_results(topics, n)))
850    }
851}
852
853//
854// Delete group handling
855//
856
857type NativeDeleteGroup = NativePtr<RDKafkaDeleteGroup>;
858
859unsafe impl KafkaDrop for RDKafkaDeleteGroup {
860    const TYPE: &'static str = "delete group";
861    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteGroup_destroy;
862}
863
864struct DeleteGroupsFuture {
865    rx: oneshot::Receiver<NativeEvent>,
866}
867
868impl Future for DeleteGroupsFuture {
869    type Output = KafkaResult<Vec<GroupResult>>;
870
871    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
873        event.check_error()?;
874        let res = unsafe { rdsys::rd_kafka_event_DeleteGroups_result(event.ptr()) };
875        if res.is_null() {
876            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
877            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
878                "delete groups request received response of incorrect type ({})",
879                typ
880            ))));
881        }
882        let mut n = 0;
883        let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
884        Poll::Ready(Ok(build_group_results(groups, n)))
885    }
886}
887
888//
889// Create partitions handling
890//
891
892/// Configuration for a CreatePartitions operation.
893pub struct NewPartitions<'a> {
894    /// The name of the topic to which partitions should be added.
895    pub topic_name: &'a str,
896    /// The total number of partitions after the operation completes.
897    pub new_partition_count: usize,
898    /// The replica assignments for the new partitions.
899    pub assignment: Option<PartitionAssignment<'a>>,
900}
901
902impl<'a> NewPartitions<'a> {
903    /// Creates a new `NewPartitions`.
904    pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
905        NewPartitions {
906            topic_name,
907            new_partition_count,
908            assignment: None,
909        }
910    }
911
912    /// Sets the partition replica assignment for the new partitions. Only
913    /// assignments for newly created replicas should be included.
914    pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'a> {
915        self.assignment = Some(assignment);
916        self
917    }
918
919    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
920        let name = CString::new(self.topic_name)?;
921        if let Some(assignment) = self.assignment {
922            // If assignment contains more than self.new_partition_count
923            // entries, we'll trip an assertion in librdkafka that crashes the
924            // process. Note that this check isn't a guarantee that the
925            // partition assignment is valid, since the assignment should only
926            // contain entries for the *new* partitions added, and not any
927            // existing partitions, but we can let the server handle that
928            // validation--we just need to make sure not to crash librdkafka.
929            if assignment.len() > self.new_partition_count {
930                return Err(KafkaError::AdminOpCreation(format!(
931                    "partition assignment for topic '{}' assigns {} partition(s), \
932                     which is more than the requested total number of partitions ({})",
933                    self.topic_name,
934                    assignment.len(),
935                    self.new_partition_count,
936                )));
937            }
938        }
939        // N.B.: we wrap partition immediately, so that it is destroyed via
940        // NativeNewPartitions's Drop implementation if replica assignment or
941        // config installation fails.
942        let partitions = unsafe {
943            NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
944                name.as_ptr(),
945                self.new_partition_count,
946                err_buf.as_mut_ptr(),
947                err_buf.capacity(),
948            ))
949        }
950        .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
951
952        if let Some(assignment) = self.assignment {
953            for (partition_id, broker_ids) in assignment.iter().enumerate() {
954                let res = unsafe {
955                    rdsys::rd_kafka_NewPartitions_set_replica_assignment(
956                        partitions.ptr(),
957                        partition_id as i32,
958                        broker_ids.as_ptr() as *mut i32,
959                        broker_ids.len(),
960                        err_buf.as_mut_ptr(),
961                        err_buf.capacity(),
962                    )
963                };
964                check_rdkafka_invalid_arg(res, err_buf)?;
965            }
966        }
967        Ok(partitions)
968    }
969}
970
971type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
972
973unsafe impl KafkaDrop for RDKafkaNewPartitions {
974    const TYPE: &'static str = "new partitions";
975    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
976}
977
978struct CreatePartitionsFuture {
979    rx: oneshot::Receiver<NativeEvent>,
980}
981
982impl Future for CreatePartitionsFuture {
983    type Output = KafkaResult<Vec<TopicResult>>;
984
985    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
986        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
987        event.check_error()?;
988        let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
989        if res.is_null() {
990            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
991            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
992                "create partitions request received response of incorrect type ({})",
993                typ
994            ))));
995        }
996        let mut n = 0;
997        let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
998        Poll::Ready(Ok(build_topic_results(topics, n)))
999    }
1000}
1001
1002//
1003// Delete records handling
1004//
1005
1006type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;
1007
1008unsafe impl KafkaDrop for RDKafkaDeleteRecords {
1009    const TYPE: &'static str = "delete records";
1010    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
1011}
1012
1013struct DeleteRecordsFuture {
1014    rx: oneshot::Receiver<NativeEvent>,
1015}
1016
1017impl Future for DeleteRecordsFuture {
1018    type Output = KafkaResult<TopicPartitionList>;
1019
1020    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1021        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1022        event.check_error()?;
1023        let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
1024        if res.is_null() {
1025            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1026            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1027                "delete records request received response of incorrect type ({})",
1028                typ
1029            ))));
1030        }
1031        let tpl = unsafe {
1032            let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
1033            TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
1034        };
1035        Poll::Ready(Ok(tpl))
1036    }
1037}
1038
1039//
1040// Describe configs handling
1041//
1042
1043/// The result of an individual DescribeConfig operation.
1044pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
1045
1046/// Specification of a configurable resource.
1047#[derive(Copy, Clone, Debug, Eq, PartialEq)]
1048pub enum ResourceSpecifier<'a> {
1049    /// A topic resource, identified by its name.
1050    Topic(&'a str),
1051    /// A group resource, identified by its ID.
1052    Group(&'a str),
1053    /// A broker resource, identified by its ID.
1054    Broker(i32),
1055}
1056
1057/// A `ResourceSpecifier` that owns its data.
1058#[derive(Debug, Eq, PartialEq)]
1059pub enum OwnedResourceSpecifier {
1060    /// A topic resource, identified by its name.
1061    Topic(String),
1062    /// A group resource, identified by its ID.
1063    Group(String),
1064    /// A broker resource, identified by its ID.
1065    Broker(i32),
1066}
1067
1068/// The source of a configuration entry.
1069#[derive(Debug, Eq, PartialEq)]
1070pub enum ConfigSource {
1071    /// Unknown. Note that Kafka brokers before v1.1.0 do not reliably provide
1072    /// configuration source information.
1073    Unknown,
1074    /// A dynamic topic configuration.
1075    DynamicTopic,
1076    /// A dynamic broker configuration.
1077    DynamicBroker,
1078    /// The default dynamic broker configuration.
1079    DynamicDefaultBroker,
1080    /// The static broker configuration.
1081    StaticBroker,
1082    /// The hardcoded default configuration.
1083    Default,
1084}
1085
1086/// An individual configuration parameter for a `ConfigResource`.
1087#[derive(Debug, Eq, PartialEq)]
1088pub struct ConfigEntry {
1089    /// The name of the configuration parameter.
1090    pub name: String,
1091    /// The value of the configuration parameter.
1092    pub value: Option<String>,
1093    /// The source of the configuration parameter.
1094    pub source: ConfigSource,
1095    /// Whether the configuration parameter is read only.
1096    pub is_read_only: bool,
1097    /// Whether the configuration parameter currently has the default value.
1098    pub is_default: bool,
1099    /// Whether the configuration parameter contains sensitive data.
1100    pub is_sensitive: bool,
1101}
1102
1103/// A configurable resource and its current configuration values.
1104#[derive(Debug)]
1105pub struct ConfigResource {
1106    /// Identifies the resource.
1107    pub specifier: OwnedResourceSpecifier,
1108    /// The current configuration parameters.
1109    pub entries: Vec<ConfigEntry>,
1110}
1111
1112impl ConfigResource {
1113    /// Builds a `HashMap` of configuration entries, keyed by configuration
1114    /// entry name.
1115    pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
1116        self.entries.iter().map(|e| (&*e.name, e)).collect()
1117    }
1118
1119    /// Searches the configuration entries to find the named parameter.
1120    ///
1121    /// For more efficient lookups, use `entry_map` to build a `HashMap`
1122    /// instead.
1123    pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
1124        self.entries.iter().find(|e| e.name == name)
1125    }
1126}
1127
1128type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
1129
1130unsafe impl KafkaDrop for RDKafkaConfigResource {
1131    const TYPE: &'static str = "config resource";
1132    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
1133}
1134
1135fn extract_config_specifier(
1136    resource: *const RDKafkaConfigResource,
1137) -> KafkaResult<OwnedResourceSpecifier> {
1138    let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
1139    match typ {
1140        RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
1141            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1142            Ok(OwnedResourceSpecifier::Topic(name))
1143        }
1144        RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
1145            let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1146            Ok(OwnedResourceSpecifier::Group(name))
1147        }
1148        RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
1149            let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
1150                .to_string_lossy();
1151            match name.parse::<i32>() {
1152                Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
1153                Err(_) => Err(KafkaError::AdminOpCreation(format!(
1154                    "bogus broker ID in kafka response: {}",
1155                    name
1156                ))),
1157            }
1158        }
1159        _ => Err(KafkaError::AdminOpCreation(format!(
1160            "bogus resource type in kafka response: {:?}",
1161            typ
1162        ))),
1163    }
1164}
1165
1166fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
1167    match config_source {
1168        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
1169        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
1170            Ok(ConfigSource::DynamicTopic)
1171        }
1172        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
1173            Ok(ConfigSource::DynamicBroker)
1174        }
1175        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
1176            Ok(ConfigSource::DynamicDefaultBroker)
1177        }
1178        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
1179            Ok(ConfigSource::StaticBroker)
1180        }
1181        RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
1182        _ => Err(KafkaError::AdminOpCreation(format!(
1183            "bogus config source type in kafka response: {:?}",
1184            config_source,
1185        ))),
1186    }
1187}
1188
1189struct DescribeConfigsFuture {
1190    rx: oneshot::Receiver<NativeEvent>,
1191}
1192
1193impl Future for DescribeConfigsFuture {
1194    type Output = KafkaResult<Vec<ConfigResourceResult>>;
1195
1196    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1197        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1198        event.check_error()?;
1199        let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
1200        if res.is_null() {
1201            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1202            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1203                "describe configs request received response of incorrect type ({})",
1204                typ
1205            ))));
1206        }
1207        let mut n = 0;
1208        let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
1209        let mut out = Vec::with_capacity(n);
1210        for i in 0..n {
1211            let resource = unsafe { *resources.add(i) };
1212            let specifier = extract_config_specifier(resource)?;
1213            let mut entries_out = Vec::new();
1214            let mut n = 0;
1215            let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
1216            for j in 0..n {
1217                let entry = unsafe { *entries.add(j) };
1218                let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
1219                let value = unsafe {
1220                    let value = rdsys::rd_kafka_ConfigEntry_value(entry);
1221                    if value.is_null() {
1222                        None
1223                    } else {
1224                        Some(cstr_to_owned(value))
1225                    }
1226                };
1227                entries_out.push(ConfigEntry {
1228                    name,
1229                    value,
1230                    source: extract_config_source(unsafe {
1231                        rdsys::rd_kafka_ConfigEntry_source(entry)
1232                    })?,
1233                    is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
1234                    is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
1235                    is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
1236                });
1237            }
1238            out.push(Ok(ConfigResource {
1239                specifier,
1240                entries: entries_out,
1241            }))
1242        }
1243        Poll::Ready(Ok(out))
1244    }
1245}
1246
1247//
1248// Alter configs handling
1249//
1250
1251/// The result of an individual AlterConfig operation.
1252pub type AlterConfigsResult =
1253    Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
1254
1255/// Configuration for an AlterConfig operation.
1256pub struct AlterConfig<'a> {
1257    /// Identifies the resource to be altered.
1258    pub specifier: ResourceSpecifier<'a>,
1259    /// The configuration parameters to be updated.
1260    pub entries: HashMap<&'a str, &'a str>,
1261}
1262
1263impl<'a> AlterConfig<'a> {
1264    /// Creates a new `AlterConfig`.
1265    pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
1266        AlterConfig {
1267            specifier,
1268            entries: HashMap::new(),
1269        }
1270    }
1271
1272    /// Sets the configuration parameter named `key` to the specified `value`.
1273    pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
1274        self.entries.insert(key, value);
1275        self
1276    }
1277
1278    fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
1279        let (name, typ) = match self.specifier {
1280            ResourceSpecifier::Topic(name) => (
1281                CString::new(name)?,
1282                RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
1283            ),
1284            ResourceSpecifier::Group(name) => (
1285                CString::new(name)?,
1286                RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
1287            ),
1288            ResourceSpecifier::Broker(id) => (
1289                CString::new(format!("{}", id))?,
1290                RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
1291            ),
1292        };
1293        // N.B.: we wrap config immediately, so that it is destroyed via the
1294        // NativeNewTopic's Drop implementation if config installation fails.
1295        let config = unsafe {
1296            NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
1297                .unwrap()
1298        };
1299        for (key, val) in &self.entries {
1300            let key_c = CString::new(*key)?;
1301            let val_c = CString::new(*val)?;
1302            let res = unsafe {
1303                rdsys::rd_kafka_ConfigResource_set_config(
1304                    config.ptr(),
1305                    key_c.as_ptr(),
1306                    val_c.as_ptr(),
1307                )
1308            };
1309            check_rdkafka_invalid_arg(res, err_buf)?;
1310        }
1311        Ok(config)
1312    }
1313}
1314
1315struct AlterConfigsFuture {
1316    rx: oneshot::Receiver<NativeEvent>,
1317}
1318
1319impl Future for AlterConfigsFuture {
1320    type Output = KafkaResult<Vec<AlterConfigsResult>>;
1321
1322    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1323        let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1324        event.check_error()?;
1325        let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
1326        if res.is_null() {
1327            let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1328            return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1329                "alter configs request received response of incorrect type ({})",
1330                typ
1331            ))));
1332        }
1333        let mut n = 0;
1334        let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
1335        let mut out = Vec::with_capacity(n);
1336        for i in 0..n {
1337            let resource = unsafe { *resources.add(i) };
1338            let specifier = extract_config_specifier(resource)?;
1339            out.push(Ok(specifier));
1340        }
1341        Poll::Ready(Ok(out))
1342    }
1343}