1use std::collections::HashMap;
8use std::ffi::{c_void, CStr, CString};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::thread::{self, JoinHandle};
15use std::time::Duration;
16
17use futures_channel::oneshot;
18use futures_util::future::{self, Either, FutureExt};
19use futures_util::ready;
20
21use rdkafka_sys as rdsys;
22use rdkafka_sys::types::*;
23
24use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
25use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
26use crate::error::{IsError, KafkaError, KafkaResult};
27use crate::log::{trace, warn};
28use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
29use crate::TopicPartitionList;
30
31pub struct AdminClient<C: ClientContext> {
40 client: Client<C>,
41 queue: Arc<NativeQueue>,
42 should_stop: Arc<AtomicBool>,
43 handle: Option<JoinHandle<()>>,
44}
45
46impl<C: ClientContext> AdminClient<C> {
47 pub fn create_topics<'a, I>(
53 &self,
54 topics: I,
55 opts: &AdminOptions,
56 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
57 where
58 I: IntoIterator<Item = &'a NewTopic<'a>>,
59 {
60 match self.create_topics_inner(topics, opts) {
61 Ok(rx) => Either::Left(CreateTopicsFuture { rx }),
62 Err(err) => Either::Right(future::err(err)),
63 }
64 }
65
66 fn create_topics_inner<'a, I>(
67 &self,
68 topics: I,
69 opts: &AdminOptions,
70 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
71 where
72 I: IntoIterator<Item = &'a NewTopic<'a>>,
73 {
74 let mut native_topics = Vec::new();
75 let mut err_buf = ErrBuf::new();
76 for t in topics {
77 native_topics.push(t.to_native(&mut err_buf)?);
78 }
79 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
80 unsafe {
81 rdsys::rd_kafka_CreateTopics(
82 self.client.native_ptr(),
83 native_topics.as_c_array(),
84 native_topics.len(),
85 native_opts.ptr(),
86 self.queue.ptr(),
87 );
88 }
89 Ok(rx)
90 }
91
92 pub fn delete_topics(
98 &self,
99 topic_names: &[&str],
100 opts: &AdminOptions,
101 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>> {
102 match self.delete_topics_inner(topic_names, opts) {
103 Ok(rx) => Either::Left(DeleteTopicsFuture { rx }),
104 Err(err) => Either::Right(future::err(err)),
105 }
106 }
107
108 fn delete_topics_inner(
109 &self,
110 topic_names: &[&str],
111 opts: &AdminOptions,
112 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
113 let mut native_topics = Vec::new();
114 let mut err_buf = ErrBuf::new();
115 for tn in topic_names {
116 let tn_c = CString::new(*tn)?;
117 let native_topic = unsafe {
118 NativeDeleteTopic::from_ptr(rdsys::rd_kafka_DeleteTopic_new(tn_c.as_ptr())).unwrap()
119 };
120 native_topics.push(native_topic);
121 }
122 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
123 unsafe {
124 rdsys::rd_kafka_DeleteTopics(
125 self.client.native_ptr(),
126 native_topics.as_c_array(),
127 native_topics.len(),
128 native_opts.ptr(),
129 self.queue.ptr(),
130 );
131 }
132 Ok(rx)
133 }
134
135 pub fn delete_groups(
137 &self,
138 group_names: &[&str],
139 opts: &AdminOptions,
140 ) -> impl Future<Output = KafkaResult<Vec<GroupResult>>> {
141 match self.delete_groups_inner(group_names, opts) {
142 Ok(rx) => Either::Left(DeleteGroupsFuture { rx }),
143 Err(err) => Either::Right(future::err(err)),
144 }
145 }
146
147 fn delete_groups_inner(
148 &self,
149 group_names: &[&str],
150 opts: &AdminOptions,
151 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
152 let mut native_groups = Vec::new();
153 let mut err_buf = ErrBuf::new();
154 for gn in group_names {
155 let gn_t = CString::new(*gn)?;
156 let native_group = unsafe {
157 NativeDeleteGroup::from_ptr(rdsys::rd_kafka_DeleteGroup_new(gn_t.as_ptr())).unwrap()
158 };
159 native_groups.push(native_group);
160 }
161 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
162
163 unsafe {
164 rdsys::rd_kafka_DeleteGroups(
165 self.client.native_ptr(),
166 native_groups.as_c_array(),
167 native_groups.len(),
168 native_opts.ptr(),
169 self.queue.ptr(),
170 )
171 }
172 Ok(rx)
173 }
174
175 pub fn create_partitions<'a, I>(
183 &self,
184 partitions: I,
185 opts: &AdminOptions,
186 ) -> impl Future<Output = KafkaResult<Vec<TopicResult>>>
187 where
188 I: IntoIterator<Item = &'a NewPartitions<'a>>,
189 {
190 match self.create_partitions_inner(partitions, opts) {
191 Ok(rx) => Either::Left(CreatePartitionsFuture { rx }),
192 Err(err) => Either::Right(future::err(err)),
193 }
194 }
195
196 fn create_partitions_inner<'a, I>(
197 &self,
198 partitions: I,
199 opts: &AdminOptions,
200 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
201 where
202 I: IntoIterator<Item = &'a NewPartitions<'a>>,
203 {
204 let mut native_partitions = Vec::new();
205 let mut err_buf = ErrBuf::new();
206 for p in partitions {
207 native_partitions.push(p.to_native(&mut err_buf)?);
208 }
209 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
210 unsafe {
211 rdsys::rd_kafka_CreatePartitions(
212 self.client.native_ptr(),
213 native_partitions.as_c_array(),
214 native_partitions.len(),
215 native_opts.ptr(),
216 self.queue.ptr(),
217 );
218 }
219 Ok(rx)
220 }
221
222 pub fn delete_records(
236 &self,
237 offsets: &TopicPartitionList,
238 opts: &AdminOptions,
239 ) -> impl Future<Output = KafkaResult<TopicPartitionList>> {
240 match self.delete_records_inner(offsets, opts) {
241 Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
242 Err(err) => Either::Right(future::err(err)),
243 }
244 }
245
246 fn delete_records_inner(
247 &self,
248 offsets: &TopicPartitionList,
249 opts: &AdminOptions,
250 ) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
251 let mut err_buf = ErrBuf::new();
252 let delete_records = unsafe {
253 NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr()))
254 }
255 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
256 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
257 unsafe {
258 rdsys::rd_kafka_DeleteRecords(
259 self.client.native_ptr(),
260 &mut delete_records.ptr(),
261 1,
262 native_opts.ptr(),
263 self.queue.ptr(),
264 );
265 }
266 Ok(rx)
267 }
268
269 pub fn describe_configs<'a, I>(
275 &self,
276 configs: I,
277 opts: &AdminOptions,
278 ) -> impl Future<Output = KafkaResult<Vec<ConfigResourceResult>>>
279 where
280 I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
281 {
282 match self.describe_configs_inner(configs, opts) {
283 Ok(rx) => Either::Left(DescribeConfigsFuture { rx }),
284 Err(err) => Either::Right(future::err(err)),
285 }
286 }
287
288 fn describe_configs_inner<'a, I>(
289 &self,
290 configs: I,
291 opts: &AdminOptions,
292 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
293 where
294 I: IntoIterator<Item = &'a ResourceSpecifier<'a>>,
295 {
296 let mut native_configs = Vec::new();
297 let mut err_buf = ErrBuf::new();
298 for c in configs {
299 let (name, typ) = match c {
300 ResourceSpecifier::Topic(name) => (
301 CString::new(*name)?,
302 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
303 ),
304 ResourceSpecifier::Group(name) => (
305 CString::new(*name)?,
306 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
307 ),
308 ResourceSpecifier::Broker(id) => (
309 CString::new(format!("{}", id))?,
310 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
311 ),
312 };
313 native_configs.push(unsafe {
314 NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(
315 typ,
316 name.as_ptr(),
317 ))
318 .unwrap()
319 });
320 }
321 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
322 unsafe {
323 rdsys::rd_kafka_DescribeConfigs(
324 self.client.native_ptr(),
325 native_configs.as_c_array(),
326 native_configs.len(),
327 native_opts.ptr(),
328 self.queue.ptr(),
329 );
330 }
331 Ok(rx)
332 }
333
334 pub fn alter_configs<'a, I>(
341 &self,
342 configs: I,
343 opts: &AdminOptions,
344 ) -> impl Future<Output = KafkaResult<Vec<AlterConfigsResult>>>
345 where
346 I: IntoIterator<Item = &'a AlterConfig<'a>>,
347 {
348 match self.alter_configs_inner(configs, opts) {
349 Ok(rx) => Either::Left(AlterConfigsFuture { rx }),
350 Err(err) => Either::Right(future::err(err)),
351 }
352 }
353
354 fn alter_configs_inner<'a, I>(
355 &self,
356 configs: I,
357 opts: &AdminOptions,
358 ) -> KafkaResult<oneshot::Receiver<NativeEvent>>
359 where
360 I: IntoIterator<Item = &'a AlterConfig<'a>>,
361 {
362 let mut native_configs = Vec::new();
363 let mut err_buf = ErrBuf::new();
364 for c in configs {
365 native_configs.push(c.to_native(&mut err_buf)?);
366 }
367 let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
368 unsafe {
369 rdsys::rd_kafka_AlterConfigs(
370 self.client.native_ptr(),
371 native_configs.as_c_array(),
372 native_configs.len(),
373 native_opts.ptr(),
374 self.queue.ptr(),
375 );
376 }
377 Ok(rx)
378 }
379
380 pub fn inner(&self) -> &Client<C> {
382 &self.client
383 }
384}
385
386impl FromClientConfig for AdminClient<DefaultClientContext> {
387 fn from_config(config: &ClientConfig) -> KafkaResult<AdminClient<DefaultClientContext>> {
388 AdminClient::from_config_and_context(config, DefaultClientContext)
389 }
390}
391
392impl<C: ClientContext> FromClientConfigAndContext<C> for AdminClient<C> {
393 fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<AdminClient<C>> {
394 let native_config = config.create_native_config()?;
395 let client = Client::new(
401 config,
402 native_config,
403 RDKafkaType::RD_KAFKA_PRODUCER,
404 context,
405 )?;
406 let queue = Arc::new(client.new_native_queue());
407 let should_stop = Arc::new(AtomicBool::new(false));
408 let handle = start_poll_thread(queue.clone(), should_stop.clone());
409 Ok(AdminClient {
410 client,
411 queue,
412 should_stop,
413 handle: Some(handle),
414 })
415 }
416}
417
418impl<C: ClientContext> Drop for AdminClient<C> {
419 fn drop(&mut self) {
420 trace!("Stopping polling");
421 self.should_stop.store(true, Ordering::Relaxed);
422 trace!("Waiting for polling thread termination");
423 match self.handle.take().unwrap().join() {
424 Ok(()) => trace!("Polling stopped"),
425 Err(e) => warn!("Failure while terminating thread: {:?}", e),
426 };
427 }
428}
429
430fn start_poll_thread(queue: Arc<NativeQueue>, should_stop: Arc<AtomicBool>) -> JoinHandle<()> {
431 thread::Builder::new()
432 .name("admin client polling thread".into())
433 .spawn(move || {
434 trace!("Admin polling thread loop started");
435 loop {
436 let event = queue.poll(Duration::from_millis(100));
437 if event.is_null() {
438 if should_stop.load(Ordering::Relaxed) {
439 break;
442 }
443 continue;
444 }
445 let event = unsafe { NativeEvent::from_ptr(event).unwrap() };
446 let tx: Box<oneshot::Sender<NativeEvent>> =
447 unsafe { IntoOpaque::from_ptr(rdsys::rd_kafka_event_opaque(event.ptr())) };
448 let _ = tx.send(event);
449 }
450 trace!("Admin polling thread loop terminated");
451 })
452 .expect("Failed to start polling thread")
453}
454
455pub(crate) type NativeEvent = NativePtr<RDKafkaEvent>;
456
457unsafe impl KafkaDrop for RDKafkaEvent {
458 const TYPE: &'static str = "event";
459 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_event_destroy;
460}
461
462unsafe impl Send for NativeEvent {}
463unsafe impl Sync for NativeEvent {}
464
465impl NativePtr<RDKafkaEvent> {
466 fn check_error(&self) -> KafkaResult<()> {
467 let err = unsafe { rdsys::rd_kafka_event_error(self.ptr()) };
468 if err.is_error() {
469 Err(KafkaError::AdminOp(err.into()))
470 } else {
471 Ok(())
472 }
473 }
474}
475
476#[derive(Default)]
482pub struct AdminOptions {
483 request_timeout: Option<Timeout>,
484 operation_timeout: Option<Timeout>,
485 validate_only: bool,
486 broker_id: Option<i32>,
487}
488
489impl AdminOptions {
490 pub fn new() -> AdminOptions {
492 AdminOptions::default()
493 }
494
495 pub fn request_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
500 self.request_timeout = timeout.map(Into::into);
501 self
502 }
503
504 pub fn operation_timeout<T: Into<Timeout>>(mut self, timeout: Option<T>) -> Self {
514 self.operation_timeout = timeout.map(Into::into);
515 self
516 }
517
518 pub fn validate_only(mut self, validate_only: bool) -> Self {
523 self.validate_only = validate_only;
524 self
525 }
526
527 pub fn broker_id<T: Into<Option<i32>>>(mut self, broker_id: T) -> Self {
532 self.broker_id = broker_id.into();
533 self
534 }
535
536 fn to_native(
537 &self,
538 client: *mut RDKafka,
539 err_buf: &mut ErrBuf,
540 ) -> KafkaResult<(NativeAdminOptions, oneshot::Receiver<NativeEvent>)> {
541 let native_opts = unsafe {
542 NativeAdminOptions::from_ptr(rdsys::rd_kafka_AdminOptions_new(
543 client,
544 RDKafkaAdminOp::RD_KAFKA_ADMIN_OP_ANY,
545 ))
546 .unwrap()
547 };
548
549 if let Some(timeout) = self.request_timeout {
550 let res = unsafe {
551 rdsys::rd_kafka_AdminOptions_set_request_timeout(
552 native_opts.ptr(),
553 timeout.as_millis(),
554 err_buf.as_mut_ptr(),
555 err_buf.capacity(),
556 )
557 };
558 check_rdkafka_invalid_arg(res, err_buf)?;
559 }
560
561 if let Some(timeout) = self.operation_timeout {
562 let res = unsafe {
563 rdsys::rd_kafka_AdminOptions_set_operation_timeout(
564 native_opts.ptr(),
565 timeout.as_millis(),
566 err_buf.as_mut_ptr(),
567 err_buf.capacity(),
568 )
569 };
570 check_rdkafka_invalid_arg(res, err_buf)?;
571 }
572
573 if self.validate_only {
574 let res = unsafe {
575 rdsys::rd_kafka_AdminOptions_set_validate_only(
576 native_opts.ptr(),
577 1, err_buf.as_mut_ptr(),
579 err_buf.capacity(),
580 )
581 };
582 check_rdkafka_invalid_arg(res, err_buf)?;
583 }
584
585 if let Some(broker_id) = self.broker_id {
586 let res = unsafe {
587 rdsys::rd_kafka_AdminOptions_set_broker(
588 native_opts.ptr(),
589 broker_id,
590 err_buf.as_mut_ptr(),
591 err_buf.capacity(),
592 )
593 };
594 check_rdkafka_invalid_arg(res, err_buf)?;
595 }
596
597 let (tx, rx) = oneshot::channel();
598 let tx = Box::into_raw(Box::new(tx)) as *mut c_void;
599 unsafe { rdsys::rd_kafka_AdminOptions_set_opaque(native_opts.ptr(), tx) };
600
601 Ok((native_opts, rx))
602 }
603}
604
605unsafe impl KafkaDrop for RDKafkaAdminOptions {
606 const TYPE: &'static str = "admin options";
607 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_AdminOptions_destroy;
608}
609
610type NativeAdminOptions = NativePtr<RDKafkaAdminOptions>;
611
612fn check_rdkafka_invalid_arg(res: RDKafkaRespErr, err_buf: &ErrBuf) -> KafkaResult<()> {
613 match res.into() {
614 RDKafkaErrorCode::NoError => Ok(()),
615 RDKafkaErrorCode::InvalidArgument => {
616 let msg = if err_buf.len() == 0 {
617 "invalid argument".into()
618 } else {
619 err_buf.to_string()
620 };
621 Err(KafkaError::AdminOpCreation(msg))
622 }
623 res => Err(KafkaError::AdminOpCreation(format!(
624 "setting admin options returned unexpected error code {}",
625 res
626 ))),
627 }
628}
629
630pub type TopicResult = Result<String, (String, RDKafkaErrorCode)>;
637
638fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Vec<TopicResult> {
639 let mut out = Vec::with_capacity(n);
640 for i in 0..n {
641 let topic = unsafe { *topics.add(i) };
642 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_topic_result_name(topic)) };
643 let err = unsafe { rdsys::rd_kafka_topic_result_error(topic) };
644 if err.is_error() {
645 out.push(Err((name, err.into())));
646 } else {
647 out.push(Ok(name));
648 }
649 }
650 out
651}
652
653pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;
655
656fn build_group_results(groups: *const *const RDKafkaGroupResult, n: usize) -> Vec<GroupResult> {
657 let mut out = Vec::with_capacity(n);
658 for i in 0..n {
659 let group = unsafe { *groups.add(i) };
660 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) };
661 let err = unsafe {
662 let err = rdsys::rd_kafka_group_result_error(group);
663 rdsys::rd_kafka_error_code(err)
664 };
665 if err.is_error() {
666 out.push(Err((name, err.into())));
667 } else {
668 out.push(Ok(name));
669 }
670 }
671 out
672}
673
674#[derive(Debug)]
680pub struct NewTopic<'a> {
681 pub name: &'a str,
683 pub num_partitions: i32,
685 pub replication: TopicReplication<'a>,
687 pub config: Vec<(&'a str, &'a str)>,
689}
690
691impl<'a> NewTopic<'a> {
692 pub fn new(
694 name: &'a str,
695 num_partitions: i32,
696 replication: TopicReplication<'a>,
697 ) -> NewTopic<'a> {
698 NewTopic {
699 name,
700 num_partitions,
701 replication,
702 config: Vec::new(),
703 }
704 }
705
706 pub fn set(mut self, key: &'a str, value: &'a str) -> NewTopic<'a> {
708 self.config.push((key, value));
709 self
710 }
711
712 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewTopic> {
713 let name = CString::new(self.name)?;
714 let repl = match self.replication {
715 TopicReplication::Fixed(n) => n,
716 TopicReplication::Variable(partitions) => {
717 if partitions.len() as i32 != self.num_partitions {
718 return Err(KafkaError::AdminOpCreation(format!(
719 "replication configuration for topic '{}' assigns {} partition(s), \
720 which does not match the specified number of partitions ({})",
721 self.name,
722 partitions.len(),
723 self.num_partitions,
724 )));
725 }
726 -1
727 }
728 };
729 let topic = unsafe {
733 NativeNewTopic::from_ptr(rdsys::rd_kafka_NewTopic_new(
734 name.as_ptr(),
735 self.num_partitions,
736 repl,
737 err_buf.as_mut_ptr(),
738 err_buf.capacity(),
739 ))
740 }
741 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
742
743 if let TopicReplication::Variable(assignment) = self.replication {
744 for (partition_id, broker_ids) in assignment.iter().enumerate() {
745 let res = unsafe {
746 rdsys::rd_kafka_NewTopic_set_replica_assignment(
747 topic.ptr(),
748 partition_id as i32,
749 broker_ids.as_ptr() as *mut i32,
750 broker_ids.len(),
751 err_buf.as_mut_ptr(),
752 err_buf.capacity(),
753 )
754 };
755 check_rdkafka_invalid_arg(res, err_buf)?;
756 }
757 }
758 for (key, val) in &self.config {
759 let key_c = CString::new(*key)?;
760 let val_c = CString::new(*val)?;
761 let res = unsafe {
762 rdsys::rd_kafka_NewTopic_set_config(topic.ptr(), key_c.as_ptr(), val_c.as_ptr())
763 };
764 check_rdkafka_invalid_arg(res, err_buf)?;
765 }
766 Ok(topic)
767 }
768}
769
770pub type PartitionAssignment<'a> = &'a [&'a [i32]];
776
777#[derive(Debug)]
779pub enum TopicReplication<'a> {
780 Fixed(i32),
782 Variable(PartitionAssignment<'a>),
785}
786
787type NativeNewTopic = NativePtr<RDKafkaNewTopic>;
788
789unsafe impl KafkaDrop for RDKafkaNewTopic {
790 const TYPE: &'static str = "new topic";
791 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewTopic_destroy;
792}
793
794struct CreateTopicsFuture {
795 rx: oneshot::Receiver<NativeEvent>,
796}
797
798impl Future for CreateTopicsFuture {
799 type Output = KafkaResult<Vec<TopicResult>>;
800
801 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
802 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
803 event.check_error()?;
804 let res = unsafe { rdsys::rd_kafka_event_CreateTopics_result(event.ptr()) };
805 if res.is_null() {
806 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
807 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
808 "create topics request received response of incorrect type ({})",
809 typ
810 ))));
811 }
812 let mut n = 0;
813 let topics = unsafe { rdsys::rd_kafka_CreateTopics_result_topics(res, &mut n) };
814 Poll::Ready(Ok(build_topic_results(topics, n)))
815 }
816}
817
818type NativeDeleteTopic = NativePtr<RDKafkaDeleteTopic>;
823
824unsafe impl KafkaDrop for RDKafkaDeleteTopic {
825 const TYPE: &'static str = "delete topic";
826 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteTopic_destroy;
827}
828
829struct DeleteTopicsFuture {
830 rx: oneshot::Receiver<NativeEvent>,
831}
832
833impl Future for DeleteTopicsFuture {
834 type Output = KafkaResult<Vec<TopicResult>>;
835
836 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
837 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
838 event.check_error()?;
839 let res = unsafe { rdsys::rd_kafka_event_DeleteTopics_result(event.ptr()) };
840 if res.is_null() {
841 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
842 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
843 "delete topics request received response of incorrect type ({})",
844 typ
845 ))));
846 }
847 let mut n = 0;
848 let topics = unsafe { rdsys::rd_kafka_DeleteTopics_result_topics(res, &mut n) };
849 Poll::Ready(Ok(build_topic_results(topics, n)))
850 }
851}
852
853type NativeDeleteGroup = NativePtr<RDKafkaDeleteGroup>;
858
859unsafe impl KafkaDrop for RDKafkaDeleteGroup {
860 const TYPE: &'static str = "delete group";
861 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteGroup_destroy;
862}
863
864struct DeleteGroupsFuture {
865 rx: oneshot::Receiver<NativeEvent>,
866}
867
868impl Future for DeleteGroupsFuture {
869 type Output = KafkaResult<Vec<GroupResult>>;
870
871 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
872 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
873 event.check_error()?;
874 let res = unsafe { rdsys::rd_kafka_event_DeleteGroups_result(event.ptr()) };
875 if res.is_null() {
876 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
877 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
878 "delete groups request received response of incorrect type ({})",
879 typ
880 ))));
881 }
882 let mut n = 0;
883 let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
884 Poll::Ready(Ok(build_group_results(groups, n)))
885 }
886}
887
888pub struct NewPartitions<'a> {
894 pub topic_name: &'a str,
896 pub new_partition_count: usize,
898 pub assignment: Option<PartitionAssignment<'a>>,
900}
901
902impl<'a> NewPartitions<'a> {
903 pub fn new(topic_name: &'a str, new_partition_count: usize) -> NewPartitions<'a> {
905 NewPartitions {
906 topic_name,
907 new_partition_count,
908 assignment: None,
909 }
910 }
911
912 pub fn assign(mut self, assignment: PartitionAssignment<'a>) -> NewPartitions<'a> {
915 self.assignment = Some(assignment);
916 self
917 }
918
919 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeNewPartitions> {
920 let name = CString::new(self.topic_name)?;
921 if let Some(assignment) = self.assignment {
922 if assignment.len() > self.new_partition_count {
930 return Err(KafkaError::AdminOpCreation(format!(
931 "partition assignment for topic '{}' assigns {} partition(s), \
932 which is more than the requested total number of partitions ({})",
933 self.topic_name,
934 assignment.len(),
935 self.new_partition_count,
936 )));
937 }
938 }
939 let partitions = unsafe {
943 NativeNewPartitions::from_ptr(rdsys::rd_kafka_NewPartitions_new(
944 name.as_ptr(),
945 self.new_partition_count,
946 err_buf.as_mut_ptr(),
947 err_buf.capacity(),
948 ))
949 }
950 .ok_or_else(|| KafkaError::AdminOpCreation(err_buf.to_string()))?;
951
952 if let Some(assignment) = self.assignment {
953 for (partition_id, broker_ids) in assignment.iter().enumerate() {
954 let res = unsafe {
955 rdsys::rd_kafka_NewPartitions_set_replica_assignment(
956 partitions.ptr(),
957 partition_id as i32,
958 broker_ids.as_ptr() as *mut i32,
959 broker_ids.len(),
960 err_buf.as_mut_ptr(),
961 err_buf.capacity(),
962 )
963 };
964 check_rdkafka_invalid_arg(res, err_buf)?;
965 }
966 }
967 Ok(partitions)
968 }
969}
970
971type NativeNewPartitions = NativePtr<RDKafkaNewPartitions>;
972
973unsafe impl KafkaDrop for RDKafkaNewPartitions {
974 const TYPE: &'static str = "new partitions";
975 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_NewPartitions_destroy;
976}
977
978struct CreatePartitionsFuture {
979 rx: oneshot::Receiver<NativeEvent>,
980}
981
982impl Future for CreatePartitionsFuture {
983 type Output = KafkaResult<Vec<TopicResult>>;
984
985 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
986 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
987 event.check_error()?;
988 let res = unsafe { rdsys::rd_kafka_event_CreatePartitions_result(event.ptr()) };
989 if res.is_null() {
990 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
991 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
992 "create partitions request received response of incorrect type ({})",
993 typ
994 ))));
995 }
996 let mut n = 0;
997 let topics = unsafe { rdsys::rd_kafka_CreatePartitions_result_topics(res, &mut n) };
998 Poll::Ready(Ok(build_topic_results(topics, n)))
999 }
1000}
1001
1002type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;
1007
1008unsafe impl KafkaDrop for RDKafkaDeleteRecords {
1009 const TYPE: &'static str = "delete records";
1010 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
1011}
1012
1013struct DeleteRecordsFuture {
1014 rx: oneshot::Receiver<NativeEvent>,
1015}
1016
1017impl Future for DeleteRecordsFuture {
1018 type Output = KafkaResult<TopicPartitionList>;
1019
1020 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1021 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1022 event.check_error()?;
1023 let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
1024 if res.is_null() {
1025 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1026 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1027 "delete records request received response of incorrect type ({})",
1028 typ
1029 ))));
1030 }
1031 let tpl = unsafe {
1032 let tpl = rdsys::rd_kafka_DeleteRecords_result_offsets(res);
1033 TopicPartitionList::from_ptr(rdsys::rd_kafka_topic_partition_list_copy(tpl))
1034 };
1035 Poll::Ready(Ok(tpl))
1036 }
1037}
1038
1039pub type ConfigResourceResult = Result<ConfigResource, RDKafkaErrorCode>;
1045
1046#[derive(Copy, Clone, Debug, Eq, PartialEq)]
1048pub enum ResourceSpecifier<'a> {
1049 Topic(&'a str),
1051 Group(&'a str),
1053 Broker(i32),
1055}
1056
1057#[derive(Debug, Eq, PartialEq)]
1059pub enum OwnedResourceSpecifier {
1060 Topic(String),
1062 Group(String),
1064 Broker(i32),
1066}
1067
1068#[derive(Debug, Eq, PartialEq)]
1070pub enum ConfigSource {
1071 Unknown,
1074 DynamicTopic,
1076 DynamicBroker,
1078 DynamicDefaultBroker,
1080 StaticBroker,
1082 Default,
1084}
1085
1086#[derive(Debug, Eq, PartialEq)]
1088pub struct ConfigEntry {
1089 pub name: String,
1091 pub value: Option<String>,
1093 pub source: ConfigSource,
1095 pub is_read_only: bool,
1097 pub is_default: bool,
1099 pub is_sensitive: bool,
1101}
1102
1103#[derive(Debug)]
1105pub struct ConfigResource {
1106 pub specifier: OwnedResourceSpecifier,
1108 pub entries: Vec<ConfigEntry>,
1110}
1111
1112impl ConfigResource {
1113 pub fn entry_map(&self) -> HashMap<&str, &ConfigEntry> {
1116 self.entries.iter().map(|e| (&*e.name, e)).collect()
1117 }
1118
1119 pub fn get(&self, name: &str) -> Option<&ConfigEntry> {
1124 self.entries.iter().find(|e| e.name == name)
1125 }
1126}
1127
1128type NativeConfigResource = NativePtr<RDKafkaConfigResource>;
1129
1130unsafe impl KafkaDrop for RDKafkaConfigResource {
1131 const TYPE: &'static str = "config resource";
1132 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ConfigResource_destroy;
1133}
1134
1135fn extract_config_specifier(
1136 resource: *const RDKafkaConfigResource,
1137) -> KafkaResult<OwnedResourceSpecifier> {
1138 let typ = unsafe { rdsys::rd_kafka_ConfigResource_type(resource) };
1139 match typ {
1140 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC => {
1141 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1142 Ok(OwnedResourceSpecifier::Topic(name))
1143 }
1144 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP => {
1145 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigResource_name(resource)) };
1146 Ok(OwnedResourceSpecifier::Group(name))
1147 }
1148 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER => {
1149 let name = unsafe { CStr::from_ptr(rdsys::rd_kafka_ConfigResource_name(resource)) }
1150 .to_string_lossy();
1151 match name.parse::<i32>() {
1152 Ok(id) => Ok(OwnedResourceSpecifier::Broker(id)),
1153 Err(_) => Err(KafkaError::AdminOpCreation(format!(
1154 "bogus broker ID in kafka response: {}",
1155 name
1156 ))),
1157 }
1158 }
1159 _ => Err(KafkaError::AdminOpCreation(format!(
1160 "bogus resource type in kafka response: {:?}",
1161 typ
1162 ))),
1163 }
1164}
1165
1166fn extract_config_source(config_source: RDKafkaConfigSource) -> KafkaResult<ConfigSource> {
1167 match config_source {
1168 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG => Ok(ConfigSource::Unknown),
1169 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG => {
1170 Ok(ConfigSource::DynamicTopic)
1171 }
1172 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG => {
1173 Ok(ConfigSource::DynamicBroker)
1174 }
1175 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG => {
1176 Ok(ConfigSource::DynamicDefaultBroker)
1177 }
1178 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG => {
1179 Ok(ConfigSource::StaticBroker)
1180 }
1181 RDKafkaConfigSource::RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG => Ok(ConfigSource::Default),
1182 _ => Err(KafkaError::AdminOpCreation(format!(
1183 "bogus config source type in kafka response: {:?}",
1184 config_source,
1185 ))),
1186 }
1187}
1188
1189struct DescribeConfigsFuture {
1190 rx: oneshot::Receiver<NativeEvent>,
1191}
1192
1193impl Future for DescribeConfigsFuture {
1194 type Output = KafkaResult<Vec<ConfigResourceResult>>;
1195
1196 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1197 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1198 event.check_error()?;
1199 let res = unsafe { rdsys::rd_kafka_event_DescribeConfigs_result(event.ptr()) };
1200 if res.is_null() {
1201 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1202 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1203 "describe configs request received response of incorrect type ({})",
1204 typ
1205 ))));
1206 }
1207 let mut n = 0;
1208 let resources = unsafe { rdsys::rd_kafka_DescribeConfigs_result_resources(res, &mut n) };
1209 let mut out = Vec::with_capacity(n);
1210 for i in 0..n {
1211 let resource = unsafe { *resources.add(i) };
1212 let specifier = extract_config_specifier(resource)?;
1213 let mut entries_out = Vec::new();
1214 let mut n = 0;
1215 let entries = unsafe { rdsys::rd_kafka_ConfigResource_configs(resource, &mut n) };
1216 for j in 0..n {
1217 let entry = unsafe { *entries.add(j) };
1218 let name = unsafe { cstr_to_owned(rdsys::rd_kafka_ConfigEntry_name(entry)) };
1219 let value = unsafe {
1220 let value = rdsys::rd_kafka_ConfigEntry_value(entry);
1221 if value.is_null() {
1222 None
1223 } else {
1224 Some(cstr_to_owned(value))
1225 }
1226 };
1227 entries_out.push(ConfigEntry {
1228 name,
1229 value,
1230 source: extract_config_source(unsafe {
1231 rdsys::rd_kafka_ConfigEntry_source(entry)
1232 })?,
1233 is_read_only: unsafe { rdsys::rd_kafka_ConfigEntry_is_read_only(entry) } != 0,
1234 is_default: unsafe { rdsys::rd_kafka_ConfigEntry_is_default(entry) } != 0,
1235 is_sensitive: unsafe { rdsys::rd_kafka_ConfigEntry_is_sensitive(entry) } != 0,
1236 });
1237 }
1238 out.push(Ok(ConfigResource {
1239 specifier,
1240 entries: entries_out,
1241 }))
1242 }
1243 Poll::Ready(Ok(out))
1244 }
1245}
1246
1247pub type AlterConfigsResult =
1253 Result<OwnedResourceSpecifier, (OwnedResourceSpecifier, RDKafkaErrorCode)>;
1254
1255pub struct AlterConfig<'a> {
1257 pub specifier: ResourceSpecifier<'a>,
1259 pub entries: HashMap<&'a str, &'a str>,
1261}
1262
1263impl<'a> AlterConfig<'a> {
1264 pub fn new(specifier: ResourceSpecifier<'_>) -> AlterConfig<'_> {
1266 AlterConfig {
1267 specifier,
1268 entries: HashMap::new(),
1269 }
1270 }
1271
1272 pub fn set(mut self, key: &'a str, value: &'a str) -> AlterConfig<'a> {
1274 self.entries.insert(key, value);
1275 self
1276 }
1277
1278 fn to_native(&self, err_buf: &mut ErrBuf) -> KafkaResult<NativeConfigResource> {
1279 let (name, typ) = match self.specifier {
1280 ResourceSpecifier::Topic(name) => (
1281 CString::new(name)?,
1282 RDKafkaResourceType::RD_KAFKA_RESOURCE_TOPIC,
1283 ),
1284 ResourceSpecifier::Group(name) => (
1285 CString::new(name)?,
1286 RDKafkaResourceType::RD_KAFKA_RESOURCE_GROUP,
1287 ),
1288 ResourceSpecifier::Broker(id) => (
1289 CString::new(format!("{}", id))?,
1290 RDKafkaResourceType::RD_KAFKA_RESOURCE_BROKER,
1291 ),
1292 };
1293 let config = unsafe {
1296 NativeConfigResource::from_ptr(rdsys::rd_kafka_ConfigResource_new(typ, name.as_ptr()))
1297 .unwrap()
1298 };
1299 for (key, val) in &self.entries {
1300 let key_c = CString::new(*key)?;
1301 let val_c = CString::new(*val)?;
1302 let res = unsafe {
1303 rdsys::rd_kafka_ConfigResource_set_config(
1304 config.ptr(),
1305 key_c.as_ptr(),
1306 val_c.as_ptr(),
1307 )
1308 };
1309 check_rdkafka_invalid_arg(res, err_buf)?;
1310 }
1311 Ok(config)
1312 }
1313}
1314
1315struct AlterConfigsFuture {
1316 rx: oneshot::Receiver<NativeEvent>,
1317}
1318
1319impl Future for AlterConfigsFuture {
1320 type Output = KafkaResult<Vec<AlterConfigsResult>>;
1321
1322 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1323 let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
1324 event.check_error()?;
1325 let res = unsafe { rdsys::rd_kafka_event_AlterConfigs_result(event.ptr()) };
1326 if res.is_null() {
1327 let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
1328 return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
1329 "alter configs request received response of incorrect type ({})",
1330 typ
1331 ))));
1332 }
1333 let mut n = 0;
1334 let resources = unsafe { rdsys::rd_kafka_AlterConfigs_result_resources(res, &mut n) };
1335 let mut out = Vec::with_capacity(n);
1336 for i in 0..n {
1337 let resource = unsafe { *resources.add(i) };
1338 let specifier = extract_config_specifier(resource)?;
1339 out.push(Ok(specifier));
1340 }
1341 Poll::Ready(Ok(out))
1342 }
1343}