1use std::collections::BTreeMap;
4
5use crabka_protocol::owned::{
6 create_partitions_request::{CreatePartitionsRequest, CreatePartitionsTopic},
7 create_topics_request::{CreatableTopic, CreatableTopicConfig, CreateTopicsRequest},
8 delete_topics_request::{DeleteTopicState, DeleteTopicsRequest},
9 metadata_request::{MetadataRequest, MetadataRequestTopic},
10};
11use crabka_protocol::primitives::uuid::Uuid as ProtoUuid;
12use uuid::Uuid;
13
14use crate::{AdminClient, AdminError, KafkaError, NOT_CONTROLLER, kafka_error_name};
15
16#[derive(Debug, Clone)]
17pub struct CreateTopicSpec {
18 pub name: String,
19 pub partitions: i32,
20 pub replicas: i32,
21 pub configs: BTreeMap<String, String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct CreateTopicOutcome {
26 pub name: String,
27 pub topic_id: Option<Uuid>,
28 pub error: Option<KafkaError>,
29}
30
31#[derive(Debug, Clone)]
32pub struct DeleteTopicOutcome {
33 pub name: String,
34 pub error: Option<KafkaError>,
35}
36
37#[derive(Debug, Clone)]
38pub struct CreatePartitionsOp {
39 pub name: String,
40 pub new_total_count: i32,
41}
42
43#[derive(Debug, Clone)]
44pub struct CreatePartitionsOutcome {
45 pub name: String,
46 pub error: Option<KafkaError>,
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct TopicMetadata {
51 pub controller_id: i32,
52 pub topics: Vec<TopicMetadataEntry>,
53}
54
55#[derive(Debug, Clone)]
56pub struct TopicMetadataEntry {
57 pub name: String,
58 pub topic_id: Option<Uuid>,
59 pub partition_count: i32,
60 pub replication_factor: i32,
61 pub error: Option<KafkaError>,
62}
63
64impl AdminClient {
65 pub async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
68 let req = build_metadata(topics);
69 let resp = self.conn.send(req).await?;
70 Ok(parse_metadata(resp))
71 }
72
73 pub async fn create_topics(
74 &mut self,
75 specs: &[CreateTopicSpec],
76 timeout_ms: i32,
77 ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
78 let first = {
79 let req = build_create_topics(specs, timeout_ms);
80 let resp = self.conn.send(req).await?;
81 parse_create_topics(resp)
82 };
83 if !any_not_controller(&first, |o| o.error.as_ref()) {
84 return Ok(first);
85 }
86 self.refresh_controller_connection().await?;
87 let second = {
88 let req = build_create_topics(specs, timeout_ms);
89 let resp = self.conn.send(req).await?;
90 parse_create_topics(resp)
91 };
92 if any_not_controller(&second, |o| o.error.as_ref()) {
93 return Err(AdminError::NotControllerExhausted);
94 }
95 Ok(second)
96 }
97
98 pub async fn delete_topics(
99 &mut self,
100 names: &[&str],
101 timeout_ms: i32,
102 ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
103 let build = || DeleteTopicsRequest {
109 topic_names: names.iter().map(|s| (*s).to_string()).collect(),
110 topics: names
111 .iter()
112 .map(|s| DeleteTopicState {
113 name: Some((*s).to_string()),
114 topic_id: ProtoUuid::ZERO,
115 ..Default::default()
116 })
117 .collect(),
118 timeout_ms,
119 ..Default::default()
120 };
121 let first = parse_delete_topics(self.conn.send(build()).await?);
122 if !any_not_controller(&first, |o| o.error.as_ref()) {
123 return Ok(first);
124 }
125 self.refresh_controller_connection().await?;
126 let second = parse_delete_topics(self.conn.send(build()).await?);
127 if any_not_controller(&second, |o| o.error.as_ref()) {
128 return Err(AdminError::NotControllerExhausted);
129 }
130 Ok(second)
131 }
132
133 pub async fn create_partitions(
134 &mut self,
135 ops: &[CreatePartitionsOp],
136 timeout_ms: i32,
137 ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
138 let build = || CreatePartitionsRequest {
139 topics: ops
140 .iter()
141 .map(|o| CreatePartitionsTopic {
142 name: o.name.clone(),
143 count: o.new_total_count,
144 assignments: None,
145 ..Default::default()
146 })
147 .collect(),
148 timeout_ms,
149 validate_only: false,
150 ..Default::default()
151 };
152 let first = parse_create_partitions(self.conn.send(build()).await?);
153 if !any_not_controller(&first, |o| o.error.as_ref()) {
154 return Ok(first);
155 }
156 self.refresh_controller_connection().await?;
157 let second = parse_create_partitions(self.conn.send(build()).await?);
158 if any_not_controller(&second, |o| o.error.as_ref()) {
159 return Err(AdminError::NotControllerExhausted);
160 }
161 Ok(second)
162 }
163
164 async fn refresh_controller_connection(&mut self) -> Result<(), AdminError> {
168 let md_resp = self.conn.send(build_metadata(&[])).await?;
169 let controller_addr =
170 controller_endpoint(&md_resp).ok_or(AdminError::NotControllerExhausted)?;
171 self.reconnect(&controller_addr).await
172 }
173}
174
175fn any_not_controller<T, F: Fn(&T) -> Option<&KafkaError>>(items: &[T], get_err: F) -> bool {
176 items
177 .iter()
178 .any(|o| matches!(get_err(o), Some(e) if e.code == NOT_CONTROLLER))
179}
180
181fn build_metadata(topics: &[&str]) -> MetadataRequest {
182 MetadataRequest {
183 topics: if topics.is_empty() {
184 None
185 } else {
186 Some(
187 topics
188 .iter()
189 .map(|n| MetadataRequestTopic {
190 topic_id: ProtoUuid::ZERO,
191 name: Some((*n).to_string()),
192 ..Default::default()
193 })
194 .collect(),
195 )
196 },
197 allow_auto_topic_creation: false,
198 include_cluster_authorized_operations: false,
199 include_topic_authorized_operations: false,
200 ..Default::default()
201 }
202}
203
204fn build_create_topics(specs: &[CreateTopicSpec], timeout_ms: i32) -> CreateTopicsRequest {
205 CreateTopicsRequest {
206 topics: specs
207 .iter()
208 .map(|s| CreatableTopic {
209 name: s.name.clone(),
210 num_partitions: s.partitions,
211 replication_factor: i16::try_from(s.replicas).unwrap_or(i16::MAX),
212 assignments: Vec::new(),
213 configs: s
214 .configs
215 .iter()
216 .map(|(k, v)| CreatableTopicConfig {
217 name: k.clone(),
218 value: Some(v.clone()),
219 ..Default::default()
220 })
221 .collect(),
222 ..Default::default()
223 })
224 .collect(),
225 timeout_ms,
226 validate_only: false,
227 ..Default::default()
228 }
229}
230
231fn parse_create_topics(
232 resp: <CreateTopicsRequest as crabka_protocol::ProtocolRequest>::Response,
233) -> Vec<CreateTopicOutcome> {
234 resp.topics
235 .into_iter()
236 .map(|t| CreateTopicOutcome {
237 name: t.name,
238 topic_id: proto_uuid_to_opt(t.topic_id),
239 error: error_if(t.error_code, t.error_message),
240 })
241 .collect()
242}
243
244fn parse_delete_topics(
245 resp: <DeleteTopicsRequest as crabka_protocol::ProtocolRequest>::Response,
246) -> Vec<DeleteTopicOutcome> {
247 resp.responses
248 .into_iter()
249 .map(|t| DeleteTopicOutcome {
250 name: t.name.unwrap_or_default(),
251 error: error_if(t.error_code, t.error_message),
252 })
253 .collect()
254}
255
256fn parse_create_partitions(
257 resp: <CreatePartitionsRequest as crabka_protocol::ProtocolRequest>::Response,
258) -> Vec<CreatePartitionsOutcome> {
259 resp.results
260 .into_iter()
261 .map(|t| CreatePartitionsOutcome {
262 name: t.name,
263 error: error_if(t.error_code, t.error_message),
264 })
265 .collect()
266}
267
268fn parse_metadata(
269 resp: <MetadataRequest as crabka_protocol::ProtocolRequest>::Response,
270) -> TopicMetadata {
271 let topics = resp
272 .topics
273 .into_iter()
274 .map(|t| {
275 let partition_count = i32::try_from(t.partitions.len()).unwrap_or(i32::MAX);
276 let replication_factor = i32::from(t.partitions.first().map_or(0, |p| {
277 i16::try_from(p.replica_nodes.len()).unwrap_or(i16::MAX)
278 }));
279 TopicMetadataEntry {
280 name: t.name.unwrap_or_default(),
281 topic_id: proto_uuid_to_opt(t.topic_id),
282 partition_count,
283 replication_factor,
284 error: error_if(t.error_code, None),
285 }
286 })
287 .collect();
288 TopicMetadata {
289 controller_id: resp.controller_id,
290 topics,
291 }
292}
293
294fn controller_endpoint(
295 resp: &<MetadataRequest as crabka_protocol::ProtocolRequest>::Response,
296) -> Option<String> {
297 let id = resp.controller_id;
298 resp.brokers
299 .iter()
300 .find(|b| b.node_id == id)
301 .map(|b| format!("{}:{}", b.host, b.port))
302}
303
304fn proto_uuid_to_opt(u: ProtoUuid) -> Option<Uuid> {
305 if u == ProtoUuid::ZERO {
306 None
307 } else {
308 Some(Uuid::from_bytes(u.0))
309 }
310}
311
312fn error_if(code: i16, message: Option<String>) -> Option<KafkaError> {
313 if code == 0 {
314 None
315 } else {
316 Some(KafkaError {
317 code,
318 name: kafka_error_name(code),
319 message,
320 })
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use assert2::assert;
328 use std::collections::BTreeMap;
329
330 #[test]
331 fn build_create_topics_one_spec() {
332 let req = build_create_topics(
333 &[CreateTopicSpec {
334 name: "foo".into(),
335 partitions: 3,
336 replicas: 1,
337 configs: BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]),
338 }],
339 5_000,
340 );
341 assert!(req.topics.len() == 1);
342 let t = &req.topics[0];
343 assert!(t.name == "foo");
344 assert!(t.num_partitions == 3);
345 assert!(t.replication_factor == 1);
346 assert!(t.configs.len() == 1);
347 assert!(t.configs[0].name == "retention.ms");
348 assert!(t.configs[0].value.as_deref() == Some("60000"));
349 assert!(req.timeout_ms == 5_000);
350 assert!(!req.validate_only);
351 }
352
353 #[test]
354 fn error_if_zero_code_is_none() {
355 assert!(error_if(0, None).is_none());
356 }
357
358 #[test]
359 fn error_if_nonzero_carries_name() {
360 let e = error_if(36, Some("dup".into())).unwrap();
361 assert!(e.code == 36);
362 assert!(e.name == "TOPIC_ALREADY_EXISTS");
363 assert!(e.message.as_deref() == Some("dup"));
364 }
365
366 #[test]
379 fn any_not_controller_predicate_matches_code_41() {
380 let outcomes = vec![
381 CreateTopicOutcome {
382 name: "a".into(),
383 topic_id: None,
384 error: None,
385 },
386 CreateTopicOutcome {
387 name: "b".into(),
388 topic_id: None,
389 error: Some(KafkaError {
390 code: NOT_CONTROLLER,
391 name: "NOT_CONTROLLER",
392 message: None,
393 }),
394 },
395 ];
396 assert!(any_not_controller(&outcomes, |o| o.error.as_ref()));
397
398 let all_ok = vec![CreateTopicOutcome {
399 name: "a".into(),
400 topic_id: None,
401 error: None,
402 }];
403 assert!(!any_not_controller(&all_ok, |o| o.error.as_ref()));
404 }
405
406 #[test]
413 fn any_not_controller_ignores_other_errors() {
414 let outcomes = vec![CreateTopicOutcome {
415 name: "b".into(),
416 topic_id: None,
417 error: Some(KafkaError {
418 code: 36, name: "TOPIC_ALREADY_EXISTS",
420 message: None,
421 }),
422 }];
423 assert!(!any_not_controller(&outcomes, |o| o.error.as_ref()));
424 }
425
426 #[test]
435 fn controller_endpoint_picks_broker_with_matching_node_id() {
436 use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseBroker};
437 let resp = MetadataResponse {
438 controller_id: 2,
439 brokers: vec![
440 MetadataResponseBroker {
441 node_id: 1,
442 host: "h1".into(),
443 port: 9092,
444 rack: None,
445 ..Default::default()
446 },
447 MetadataResponseBroker {
448 node_id: 2,
449 host: "h2".into(),
450 port: 9093,
451 rack: None,
452 ..Default::default()
453 },
454 ],
455 ..Default::default()
456 };
457 let addr = controller_endpoint(&resp);
458 assert!(addr.as_deref() == Some("h2:9093"));
459 }
460
461 #[test]
466 fn controller_endpoint_returns_none_when_no_match() {
467 use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseBroker};
468 let resp = MetadataResponse {
469 controller_id: 99,
470 brokers: vec![MetadataResponseBroker {
471 node_id: 1,
472 host: "h1".into(),
473 port: 9092,
474 rack: None,
475 ..Default::default()
476 }],
477 ..Default::default()
478 };
479 assert!(controller_endpoint(&resp).is_none());
480 }
481
482 #[test]
491 fn parse_metadata_carries_through_per_topic_errors() {
492 use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseTopic};
493 let resp = MetadataResponse {
494 topics: vec![
495 MetadataResponseTopic {
496 name: Some("ok-topic".into()),
497 error_code: 0,
498 ..Default::default()
499 },
500 MetadataResponseTopic {
501 name: Some("missing".into()),
502 error_code: 3, ..Default::default()
504 },
505 ],
506 ..Default::default()
507 };
508 let md = parse_metadata(resp);
509 assert!(md.topics.len() == 2);
510 assert!(md.topics[0].name == "ok-topic");
511 assert!(md.topics[0].error.is_none());
512 assert!(md.topics[1].name == "missing");
513 let err = md.topics[1].error.as_ref().expect("error expected");
514 assert!(err.code == 3);
515 assert!(err.name == "UNKNOWN_TOPIC_OR_PARTITION");
516 }
517
518 #[test]
519 fn parse_metadata_zero_uuid_becomes_none() {
520 use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseTopic};
521 let resp = MetadataResponse {
522 topics: vec![MetadataResponseTopic {
523 name: Some("foo".into()),
524 topic_id: ProtoUuid::ZERO,
525 ..Default::default()
526 }],
527 ..Default::default()
528 };
529 let md = parse_metadata(resp);
530 assert!(md.topics[0].topic_id.is_none());
531 }
532
533 #[test]
534 fn parse_metadata_computes_partition_count_and_replication_factor() {
535 use crabka_protocol::owned::metadata_response::{
536 MetadataResponse, MetadataResponsePartition, MetadataResponseTopic,
537 };
538 let part = MetadataResponsePartition {
539 replica_nodes: vec![1, 2],
540 ..Default::default()
541 };
542 let resp = MetadataResponse {
543 topics: vec![MetadataResponseTopic {
544 name: Some("foo".into()),
545 partitions: vec![part.clone(), part.clone(), part],
546 ..Default::default()
547 }],
548 ..Default::default()
549 };
550 let md = parse_metadata(resp);
551 assert!(md.topics[0].partition_count == 3);
552 assert!(md.topics[0].replication_factor == 2);
553 }
554
555 #[test]
558 fn parse_create_topics_per_topic_error() {
559 use crabka_protocol::owned::create_topics_response::{
560 CreatableTopicResult, CreateTopicsResponse,
561 };
562 let resp = CreateTopicsResponse {
563 topics: vec![
564 CreatableTopicResult {
565 name: "ok".into(),
566 topic_id: ProtoUuid([7; 16]),
567 error_code: 0,
568 error_message: None,
569 ..Default::default()
570 },
571 CreatableTopicResult {
572 name: "dup".into(),
573 error_code: 36, error_message: Some("already there".into()),
575 ..Default::default()
576 },
577 ],
578 ..Default::default()
579 };
580 let outcomes = parse_create_topics(resp);
581 assert!(outcomes.len() == 2);
582 assert!(outcomes[0].name == "ok");
583 assert!(outcomes[0].error.is_none());
584 assert!(
585 outcomes[0].topic_id.is_some(),
586 "non-zero uuid should map to Some"
587 );
588
589 assert!(outcomes[1].name == "dup");
590 let err = outcomes[1].error.as_ref().expect("error expected");
591 assert!(err.code == 36);
592 assert!(err.name == "TOPIC_ALREADY_EXISTS");
593 assert!(err.message.as_deref() == Some("already there"));
594 }
595
596 #[test]
599 fn parse_delete_topics_handles_missing_name() {
600 use crabka_protocol::owned::delete_topics_response::{
601 DeletableTopicResult, DeleteTopicsResponse,
602 };
603 let resp = DeleteTopicsResponse {
604 responses: vec![
605 DeletableTopicResult {
606 name: None,
607 error_code: 0,
608 ..Default::default()
609 },
610 DeletableTopicResult {
611 name: Some("named".into()),
612 error_code: 3,
613 error_message: Some("nope".into()),
614 ..Default::default()
615 },
616 ],
617 ..Default::default()
618 };
619 let outs = parse_delete_topics(resp);
620 assert!(outs.len() == 2);
621 assert!(outs[0].name == String::new());
623 assert!(outs[0].error.is_none());
624 assert!(outs[1].name == "named");
625 let err = outs[1].error.as_ref().expect("error expected");
626 assert!(err.code == 3);
627 assert!(err.name == "UNKNOWN_TOPIC_OR_PARTITION");
628 assert!(err.message.as_deref() == Some("nope"));
629 }
630
631 #[test]
634 fn parse_create_partitions_per_topic_error() {
635 use crabka_protocol::owned::create_partitions_response::{
636 CreatePartitionsResponse, CreatePartitionsTopicResult,
637 };
638 let resp = CreatePartitionsResponse {
639 results: vec![
640 CreatePartitionsTopicResult {
641 name: "ok".into(),
642 error_code: 0,
643 error_message: None,
644 ..Default::default()
645 },
646 CreatePartitionsTopicResult {
647 name: "bad".into(),
648 error_code: 37,
649 error_message: Some("bad count".into()),
650 ..Default::default()
651 },
652 ],
653 ..Default::default()
654 };
655 let outs = parse_create_partitions(resp);
656 assert!(outs.len() == 2);
657 assert!(outs[0].name == "ok");
658 assert!(outs[0].error.is_none());
659 assert!(outs[1].name == "bad");
660 let err = outs[1].error.as_ref().expect("error expected");
661 assert!(err.code == 37);
662 assert!(err.name == "INVALID_PARTITIONS");
663 assert!(err.message.as_deref() == Some("bad count"));
664 }
665}