Skip to main content

crabka_client_admin/
topics.rs

1//! Topic CRUD wrappers.
2
3use std::collections::BTreeMap;
4
5use crabka_protocol::owned::{
6    create_partitions_request::{CreatePartitionsRequest, CreatePartitionsTopic},
7    create_topics_request::{CreatableTopic, CreatableTopicConfig, CreateTopicsRequest},
8    delete_topics_request::{DeleteTopicState, DeleteTopicsRequest},
9    metadata_request::{MetadataRequest, MetadataRequestTopic},
10};
11use crabka_protocol::primitives::uuid::Uuid as ProtoUuid;
12use uuid::Uuid;
13
14use crate::{AdminClient, AdminError, KafkaError, NOT_CONTROLLER, kafka_error_name};
15
16#[derive(Debug, Clone)]
17pub struct CreateTopicSpec {
18    pub name: String,
19    pub partitions: i32,
20    pub replicas: i32,
21    pub configs: BTreeMap<String, String>,
22}
23
24#[derive(Debug, Clone)]
25pub struct CreateTopicOutcome {
26    pub name: String,
27    pub topic_id: Option<Uuid>,
28    pub error: Option<KafkaError>,
29}
30
31#[derive(Debug, Clone)]
32pub struct DeleteTopicOutcome {
33    pub name: String,
34    pub error: Option<KafkaError>,
35}
36
37#[derive(Debug, Clone)]
38pub struct CreatePartitionsOp {
39    pub name: String,
40    pub new_total_count: i32,
41}
42
43#[derive(Debug, Clone)]
44pub struct CreatePartitionsOutcome {
45    pub name: String,
46    pub error: Option<KafkaError>,
47}
48
49#[derive(Debug, Clone, Default)]
50pub struct TopicMetadata {
51    pub controller_id: i32,
52    pub topics: Vec<TopicMetadataEntry>,
53}
54
55#[derive(Debug, Clone)]
56pub struct TopicMetadataEntry {
57    pub name: String,
58    pub topic_id: Option<Uuid>,
59    pub partition_count: i32,
60    pub replication_factor: i32,
61    pub error: Option<KafkaError>,
62}
63
64impl AdminClient {
65    /// Metadata for the named topics. Pass an empty slice to fetch all
66    /// topics, per Kafka semantics.
67    pub async fn metadata(&mut self, topics: &[&str]) -> Result<TopicMetadata, AdminError> {
68        let req = build_metadata(topics);
69        let resp = self.conn.send(req).await?;
70        Ok(parse_metadata(resp))
71    }
72
73    pub async fn create_topics(
74        &mut self,
75        specs: &[CreateTopicSpec],
76        timeout_ms: i32,
77    ) -> Result<Vec<CreateTopicOutcome>, AdminError> {
78        let first = {
79            let req = build_create_topics(specs, timeout_ms);
80            let resp = self.conn.send(req).await?;
81            parse_create_topics(resp)
82        };
83        if !any_not_controller(&first, |o| o.error.as_ref()) {
84            return Ok(first);
85        }
86        self.refresh_controller_connection().await?;
87        let second = {
88            let req = build_create_topics(specs, timeout_ms);
89            let resp = self.conn.send(req).await?;
90            parse_create_topics(resp)
91        };
92        if any_not_controller(&second, |o| o.error.as_ref()) {
93            return Err(AdminError::NotControllerExhausted);
94        }
95        Ok(second)
96    }
97
98    pub async fn delete_topics(
99        &mut self,
100        names: &[&str],
101        timeout_ms: i32,
102    ) -> Result<Vec<DeleteTopicOutcome>, AdminError> {
103        // Populate BOTH fields so the request works regardless of the
104        // negotiated protocol version: `topic_names` is the legacy field
105        // (v0-v5) and `topics` is the v6+ replacement. The
106        // `ApiVersionTable`-driven encoder picks the version-relevant
107        // field and ignores the other.
108        let build = || DeleteTopicsRequest {
109            topic_names: names.iter().map(|s| (*s).to_string()).collect(),
110            topics: names
111                .iter()
112                .map(|s| DeleteTopicState {
113                    name: Some((*s).to_string()),
114                    topic_id: ProtoUuid::ZERO,
115                    ..Default::default()
116                })
117                .collect(),
118            timeout_ms,
119            ..Default::default()
120        };
121        let first = parse_delete_topics(self.conn.send(build()).await?);
122        if !any_not_controller(&first, |o| o.error.as_ref()) {
123            return Ok(first);
124        }
125        self.refresh_controller_connection().await?;
126        let second = parse_delete_topics(self.conn.send(build()).await?);
127        if any_not_controller(&second, |o| o.error.as_ref()) {
128            return Err(AdminError::NotControllerExhausted);
129        }
130        Ok(second)
131    }
132
133    pub async fn create_partitions(
134        &mut self,
135        ops: &[CreatePartitionsOp],
136        timeout_ms: i32,
137    ) -> Result<Vec<CreatePartitionsOutcome>, AdminError> {
138        let build = || CreatePartitionsRequest {
139            topics: ops
140                .iter()
141                .map(|o| CreatePartitionsTopic {
142                    name: o.name.clone(),
143                    count: o.new_total_count,
144                    assignments: None,
145                    ..Default::default()
146                })
147                .collect(),
148            timeout_ms,
149            validate_only: false,
150            ..Default::default()
151        };
152        let first = parse_create_partitions(self.conn.send(build()).await?);
153        if !any_not_controller(&first, |o| o.error.as_ref()) {
154            return Ok(first);
155        }
156        self.refresh_controller_connection().await?;
157        let second = parse_create_partitions(self.conn.send(build()).await?);
158        if any_not_controller(&second, |o| o.error.as_ref()) {
159            return Err(AdminError::NotControllerExhausted);
160        }
161        Ok(second)
162    }
163
164    /// Fetch Metadata, find the controller's `host:port`, and replace
165    /// `self.conn` with a connection to it. Used by the per-method
166    /// `NOT_CONTROLLER` retry paths above.
167    async fn refresh_controller_connection(&mut self) -> Result<(), AdminError> {
168        let md_resp = self.conn.send(build_metadata(&[])).await?;
169        let controller_addr =
170            controller_endpoint(&md_resp).ok_or(AdminError::NotControllerExhausted)?;
171        self.reconnect(&controller_addr).await
172    }
173}
174
175fn any_not_controller<T, F: Fn(&T) -> Option<&KafkaError>>(items: &[T], get_err: F) -> bool {
176    items
177        .iter()
178        .any(|o| matches!(get_err(o), Some(e) if e.code == NOT_CONTROLLER))
179}
180
181fn build_metadata(topics: &[&str]) -> MetadataRequest {
182    MetadataRequest {
183        topics: if topics.is_empty() {
184            None
185        } else {
186            Some(
187                topics
188                    .iter()
189                    .map(|n| MetadataRequestTopic {
190                        topic_id: ProtoUuid::ZERO,
191                        name: Some((*n).to_string()),
192                        ..Default::default()
193                    })
194                    .collect(),
195            )
196        },
197        allow_auto_topic_creation: false,
198        include_cluster_authorized_operations: false,
199        include_topic_authorized_operations: false,
200        ..Default::default()
201    }
202}
203
204fn build_create_topics(specs: &[CreateTopicSpec], timeout_ms: i32) -> CreateTopicsRequest {
205    CreateTopicsRequest {
206        topics: specs
207            .iter()
208            .map(|s| CreatableTopic {
209                name: s.name.clone(),
210                num_partitions: s.partitions,
211                replication_factor: i16::try_from(s.replicas).unwrap_or(i16::MAX),
212                assignments: Vec::new(),
213                configs: s
214                    .configs
215                    .iter()
216                    .map(|(k, v)| CreatableTopicConfig {
217                        name: k.clone(),
218                        value: Some(v.clone()),
219                        ..Default::default()
220                    })
221                    .collect(),
222                ..Default::default()
223            })
224            .collect(),
225        timeout_ms,
226        validate_only: false,
227        ..Default::default()
228    }
229}
230
231fn parse_create_topics(
232    resp: <CreateTopicsRequest as crabka_protocol::ProtocolRequest>::Response,
233) -> Vec<CreateTopicOutcome> {
234    resp.topics
235        .into_iter()
236        .map(|t| CreateTopicOutcome {
237            name: t.name,
238            topic_id: proto_uuid_to_opt(t.topic_id),
239            error: error_if(t.error_code, t.error_message),
240        })
241        .collect()
242}
243
244fn parse_delete_topics(
245    resp: <DeleteTopicsRequest as crabka_protocol::ProtocolRequest>::Response,
246) -> Vec<DeleteTopicOutcome> {
247    resp.responses
248        .into_iter()
249        .map(|t| DeleteTopicOutcome {
250            name: t.name.unwrap_or_default(),
251            error: error_if(t.error_code, t.error_message),
252        })
253        .collect()
254}
255
256fn parse_create_partitions(
257    resp: <CreatePartitionsRequest as crabka_protocol::ProtocolRequest>::Response,
258) -> Vec<CreatePartitionsOutcome> {
259    resp.results
260        .into_iter()
261        .map(|t| CreatePartitionsOutcome {
262            name: t.name,
263            error: error_if(t.error_code, t.error_message),
264        })
265        .collect()
266}
267
268fn parse_metadata(
269    resp: <MetadataRequest as crabka_protocol::ProtocolRequest>::Response,
270) -> TopicMetadata {
271    let topics = resp
272        .topics
273        .into_iter()
274        .map(|t| {
275            let partition_count = i32::try_from(t.partitions.len()).unwrap_or(i32::MAX);
276            let replication_factor = i32::from(t.partitions.first().map_or(0, |p| {
277                i16::try_from(p.replica_nodes.len()).unwrap_or(i16::MAX)
278            }));
279            TopicMetadataEntry {
280                name: t.name.unwrap_or_default(),
281                topic_id: proto_uuid_to_opt(t.topic_id),
282                partition_count,
283                replication_factor,
284                error: error_if(t.error_code, None),
285            }
286        })
287        .collect();
288    TopicMetadata {
289        controller_id: resp.controller_id,
290        topics,
291    }
292}
293
294fn controller_endpoint(
295    resp: &<MetadataRequest as crabka_protocol::ProtocolRequest>::Response,
296) -> Option<String> {
297    let id = resp.controller_id;
298    resp.brokers
299        .iter()
300        .find(|b| b.node_id == id)
301        .map(|b| format!("{}:{}", b.host, b.port))
302}
303
304fn proto_uuid_to_opt(u: ProtoUuid) -> Option<Uuid> {
305    if u == ProtoUuid::ZERO {
306        None
307    } else {
308        Some(Uuid::from_bytes(u.0))
309    }
310}
311
312fn error_if(code: i16, message: Option<String>) -> Option<KafkaError> {
313    if code == 0 {
314        None
315    } else {
316        Some(KafkaError {
317            code,
318            name: kafka_error_name(code),
319            message,
320        })
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use assert2::assert;
328    use std::collections::BTreeMap;
329
330    #[test]
331    fn build_create_topics_one_spec() {
332        let req = build_create_topics(
333            &[CreateTopicSpec {
334                name: "foo".into(),
335                partitions: 3,
336                replicas: 1,
337                configs: BTreeMap::from([("retention.ms".to_string(), "60000".to_string())]),
338            }],
339            5_000,
340        );
341        assert!(req.topics.len() == 1);
342        let t = &req.topics[0];
343        assert!(t.name == "foo");
344        assert!(t.num_partitions == 3);
345        assert!(t.replication_factor == 1);
346        assert!(t.configs.len() == 1);
347        assert!(t.configs[0].name == "retention.ms");
348        assert!(t.configs[0].value.as_deref() == Some("60000"));
349        assert!(req.timeout_ms == 5_000);
350        assert!(!req.validate_only);
351    }
352
353    #[test]
354    fn error_if_zero_code_is_none() {
355        assert!(error_if(0, None).is_none());
356    }
357
358    #[test]
359    fn error_if_nonzero_carries_name() {
360        let e = error_if(36, Some("dup".into())).unwrap();
361        assert!(e.code == 36);
362        assert!(e.name == "TOPIC_ALREADY_EXISTS");
363        assert!(e.message.as_deref() == Some("dup"));
364    }
365
366    // ── NOT_CONTROLLER retry predicate ─────────────────────────────
367    //
368    // The full retry pipeline (first response carries NOT_CONTROLLER →
369    // refresh controller endpoint → reconnect → second response succeeds)
370    // is exercised against a real broker in `tests/round_trip.rs`. The
371    // unit tests below lock the two pure pieces — the predicate that
372    // decides whether to retry, and the metadata-response → host:port
373    // resolver — so a refactor can't silently flip either one.
374
375    /// Spec test name: `not_controller_triggers_one_retry` (predicate
376    /// half). Verifies that `any_not_controller` returns `true` iff at
377    /// least one outcome carries the `NOT_CONTROLLER (41)` error code.
378    #[test]
379    fn any_not_controller_predicate_matches_code_41() {
380        let outcomes = vec![
381            CreateTopicOutcome {
382                name: "a".into(),
383                topic_id: None,
384                error: None,
385            },
386            CreateTopicOutcome {
387                name: "b".into(),
388                topic_id: None,
389                error: Some(KafkaError {
390                    code: NOT_CONTROLLER,
391                    name: "NOT_CONTROLLER",
392                    message: None,
393                }),
394            },
395        ];
396        assert!(any_not_controller(&outcomes, |o| o.error.as_ref()));
397
398        let all_ok = vec![CreateTopicOutcome {
399            name: "a".into(),
400            topic_id: None,
401            error: None,
402        }];
403        assert!(!any_not_controller(&all_ok, |o| o.error.as_ref()));
404    }
405
406    /// Spec test name: `repeated_not_controller_errors_return_exhausted`
407    /// (predicate half). Non-`NOT_CONTROLLER` errors must NOT trigger
408    /// the retry path — only code 41 does. Combined with the integration
409    /// test, this locks the retry-eligibility check: if the predicate
410    /// fired on, say, `TOPIC_ALREADY_EXISTS`, callers would see spurious
411    /// reconnects + `NotControllerExhausted` returns on real failures.
412    #[test]
413    fn any_not_controller_ignores_other_errors() {
414        let outcomes = vec![CreateTopicOutcome {
415            name: "b".into(),
416            topic_id: None,
417            error: Some(KafkaError {
418                code: 36, // TOPIC_ALREADY_EXISTS
419                name: "TOPIC_ALREADY_EXISTS",
420                message: None,
421            }),
422        }];
423        assert!(!any_not_controller(&outcomes, |o| o.error.as_ref()));
424    }
425
426    // ── controller_endpoint resolver ───────────────────────────────
427
428    /// Spec test name: `connect_walks_bootstrap_list` (resolver half —
429    /// the actual bootstrap-walking integration coverage lives in
430    /// `tests/connect.rs`). `controller_endpoint` extracts the
431    /// `host:port` of the broker whose `node_id` matches the metadata
432    /// response's `controller_id`. This is the address the
433    /// `NOT_CONTROLLER` retry path reconnects to.
434    #[test]
435    fn controller_endpoint_picks_broker_with_matching_node_id() {
436        use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseBroker};
437        let resp = MetadataResponse {
438            controller_id: 2,
439            brokers: vec![
440                MetadataResponseBroker {
441                    node_id: 1,
442                    host: "h1".into(),
443                    port: 9092,
444                    rack: None,
445                    ..Default::default()
446                },
447                MetadataResponseBroker {
448                    node_id: 2,
449                    host: "h2".into(),
450                    port: 9093,
451                    rack: None,
452                    ..Default::default()
453                },
454            ],
455            ..Default::default()
456        };
457        let addr = controller_endpoint(&resp);
458        assert!(addr.as_deref() == Some("h2:9093"));
459    }
460
461    /// When the controller id doesn't appear in the broker list (e.g.
462    /// the cluster is mid-failover), `controller_endpoint` returns
463    /// `None`, which the retry path maps to
464    /// `AdminError::NotControllerExhausted`.
465    #[test]
466    fn controller_endpoint_returns_none_when_no_match() {
467        use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseBroker};
468        let resp = MetadataResponse {
469            controller_id: 99,
470            brokers: vec![MetadataResponseBroker {
471                node_id: 1,
472                host: "h1".into(),
473                port: 9092,
474                rack: None,
475                ..Default::default()
476            }],
477            ..Default::default()
478        };
479        assert!(controller_endpoint(&resp).is_none());
480    }
481
482    // ── parse_metadata ─────────────────────────────────────────────────
483    //
484    // `parse_metadata` is the pure response→`TopicMetadata` transformer
485    // the live `metadata` RPC delegates to. The tests below feed it
486    // synthetic responses and assert the per-topic fields are projected
487    // correctly. Covers the error-mapping, uuid-zeroing, and
488    // partition/replication-factor count paths.
489
490    #[test]
491    fn parse_metadata_carries_through_per_topic_errors() {
492        use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseTopic};
493        let resp = MetadataResponse {
494            topics: vec![
495                MetadataResponseTopic {
496                    name: Some("ok-topic".into()),
497                    error_code: 0,
498                    ..Default::default()
499                },
500                MetadataResponseTopic {
501                    name: Some("missing".into()),
502                    error_code: 3, // UNKNOWN_TOPIC_OR_PARTITION
503                    ..Default::default()
504                },
505            ],
506            ..Default::default()
507        };
508        let md = parse_metadata(resp);
509        assert!(md.topics.len() == 2);
510        assert!(md.topics[0].name == "ok-topic");
511        assert!(md.topics[0].error.is_none());
512        assert!(md.topics[1].name == "missing");
513        let err = md.topics[1].error.as_ref().expect("error expected");
514        assert!(err.code == 3);
515        assert!(err.name == "UNKNOWN_TOPIC_OR_PARTITION");
516    }
517
518    #[test]
519    fn parse_metadata_zero_uuid_becomes_none() {
520        use crabka_protocol::owned::metadata_response::{MetadataResponse, MetadataResponseTopic};
521        let resp = MetadataResponse {
522            topics: vec![MetadataResponseTopic {
523                name: Some("foo".into()),
524                topic_id: ProtoUuid::ZERO,
525                ..Default::default()
526            }],
527            ..Default::default()
528        };
529        let md = parse_metadata(resp);
530        assert!(md.topics[0].topic_id.is_none());
531    }
532
533    #[test]
534    fn parse_metadata_computes_partition_count_and_replication_factor() {
535        use crabka_protocol::owned::metadata_response::{
536            MetadataResponse, MetadataResponsePartition, MetadataResponseTopic,
537        };
538        let part = MetadataResponsePartition {
539            replica_nodes: vec![1, 2],
540            ..Default::default()
541        };
542        let resp = MetadataResponse {
543            topics: vec![MetadataResponseTopic {
544                name: Some("foo".into()),
545                partitions: vec![part.clone(), part.clone(), part],
546                ..Default::default()
547            }],
548            ..Default::default()
549        };
550        let md = parse_metadata(resp);
551        assert!(md.topics[0].partition_count == 3);
552        assert!(md.topics[0].replication_factor == 2);
553    }
554
555    // ── parse_create_topics ────────────────────────────────────────────
556
557    #[test]
558    fn parse_create_topics_per_topic_error() {
559        use crabka_protocol::owned::create_topics_response::{
560            CreatableTopicResult, CreateTopicsResponse,
561        };
562        let resp = CreateTopicsResponse {
563            topics: vec![
564                CreatableTopicResult {
565                    name: "ok".into(),
566                    topic_id: ProtoUuid([7; 16]),
567                    error_code: 0,
568                    error_message: None,
569                    ..Default::default()
570                },
571                CreatableTopicResult {
572                    name: "dup".into(),
573                    error_code: 36, // TOPIC_ALREADY_EXISTS
574                    error_message: Some("already there".into()),
575                    ..Default::default()
576                },
577            ],
578            ..Default::default()
579        };
580        let outcomes = parse_create_topics(resp);
581        assert!(outcomes.len() == 2);
582        assert!(outcomes[0].name == "ok");
583        assert!(outcomes[0].error.is_none());
584        assert!(
585            outcomes[0].topic_id.is_some(),
586            "non-zero uuid should map to Some"
587        );
588
589        assert!(outcomes[1].name == "dup");
590        let err = outcomes[1].error.as_ref().expect("error expected");
591        assert!(err.code == 36);
592        assert!(err.name == "TOPIC_ALREADY_EXISTS");
593        assert!(err.message.as_deref() == Some("already there"));
594    }
595
596    // ── parse_delete_topics ────────────────────────────────────────────
597
598    #[test]
599    fn parse_delete_topics_handles_missing_name() {
600        use crabka_protocol::owned::delete_topics_response::{
601            DeletableTopicResult, DeleteTopicsResponse,
602        };
603        let resp = DeleteTopicsResponse {
604            responses: vec![
605                DeletableTopicResult {
606                    name: None,
607                    error_code: 0,
608                    ..Default::default()
609                },
610                DeletableTopicResult {
611                    name: Some("named".into()),
612                    error_code: 3,
613                    error_message: Some("nope".into()),
614                    ..Default::default()
615                },
616            ],
617            ..Default::default()
618        };
619        let outs = parse_delete_topics(resp);
620        assert!(outs.len() == 2);
621        // `name: None` falls through to `unwrap_or_default()` → empty string.
622        assert!(outs[0].name == String::new());
623        assert!(outs[0].error.is_none());
624        assert!(outs[1].name == "named");
625        let err = outs[1].error.as_ref().expect("error expected");
626        assert!(err.code == 3);
627        assert!(err.name == "UNKNOWN_TOPIC_OR_PARTITION");
628        assert!(err.message.as_deref() == Some("nope"));
629    }
630
631    // ── parse_create_partitions ────────────────────────────────────────
632
633    #[test]
634    fn parse_create_partitions_per_topic_error() {
635        use crabka_protocol::owned::create_partitions_response::{
636            CreatePartitionsResponse, CreatePartitionsTopicResult,
637        };
638        let resp = CreatePartitionsResponse {
639            results: vec![
640                CreatePartitionsTopicResult {
641                    name: "ok".into(),
642                    error_code: 0,
643                    error_message: None,
644                    ..Default::default()
645                },
646                CreatePartitionsTopicResult {
647                    name: "bad".into(),
648                    error_code: 37,
649                    error_message: Some("bad count".into()),
650                    ..Default::default()
651                },
652            ],
653            ..Default::default()
654        };
655        let outs = parse_create_partitions(resp);
656        assert!(outs.len() == 2);
657        assert!(outs[0].name == "ok");
658        assert!(outs[0].error.is_none());
659        assert!(outs[1].name == "bad");
660        let err = outs[1].error.as_ref().expect("error expected");
661        assert!(err.code == 37);
662        assert!(err.name == "INVALID_PARTITIONS");
663        assert!(err.message.as_deref() == Some("bad count"));
664    }
665}