rabbitmq_stream_client/client/
metadata.rs

1use std::collections::HashMap;
2
3use rabbitmq_stream_protocol::{commands::metadata::MetadataResponse, ResponseCode};
4
5#[derive(Debug, PartialEq, Eq, Clone)]
6pub struct Broker {
7    pub host: String,
8    pub port: u32,
9}
10
11#[derive(Debug, PartialEq, Eq)]
12pub struct StreamMetadata {
13    pub stream: String,
14    pub response_code: ResponseCode,
15    pub leader: Broker,
16    pub replicas: Vec<Broker>,
17}
18
19pub fn from_response(response: MetadataResponse) -> HashMap<String, StreamMetadata> {
20    let brokers: HashMap<u16, Broker> = response
21        .brokers
22        .into_iter()
23        .map(|broker| {
24            (
25                broker.reference,
26                Broker {
27                    host: broker.host,
28                    port: broker.port,
29                },
30            )
31        })
32        .collect();
33
34    response
35        .stream_metadata
36        .into_iter()
37        .filter_map(|metadata| {
38            brokers.get(&metadata.leader_reference).map(|leader| {
39                (
40                    metadata.stream_name.clone(),
41                    StreamMetadata {
42                        stream: metadata.stream_name,
43                        response_code: metadata.code,
44                        leader: leader.clone(),
45                        replicas: metadata
46                            .replicas_references
47                            .into_iter()
48                            .filter_map(|replica| brokers.get(&replica).cloned())
49                            .collect(),
50                    },
51                )
52            })
53        })
54        .collect()
55}