rabbitmq_stream_client/client/
metadata.rs1use std::collections::HashMap;
2
3use rabbitmq_stream_protocol::{commands::metadata::MetadataResponse, ResponseCode};
4
5#[derive(Debug, PartialEq, Eq, Clone)]
6pub struct Broker {
7 pub host: String,
8 pub port: u32,
9}
10
11#[derive(Debug, PartialEq, Eq)]
12pub struct StreamMetadata {
13 pub stream: String,
14 pub response_code: ResponseCode,
15 pub leader: Broker,
16 pub replicas: Vec<Broker>,
17}
18
19pub fn from_response(response: MetadataResponse) -> HashMap<String, StreamMetadata> {
20 let brokers: HashMap<u16, Broker> = response
21 .brokers
22 .into_iter()
23 .map(|broker| {
24 (
25 broker.reference,
26 Broker {
27 host: broker.host,
28 port: broker.port,
29 },
30 )
31 })
32 .collect();
33
34 response
35 .stream_metadata
36 .into_iter()
37 .filter_map(|metadata| {
38 brokers.get(&metadata.leader_reference).map(|leader| {
39 (
40 metadata.stream_name.clone(),
41 StreamMetadata {
42 stream: metadata.stream_name,
43 response_code: metadata.code,
44 leader: leader.clone(),
45 replicas: metadata
46 .replicas_references
47 .into_iter()
48 .filter_map(|replica| brokers.get(&replica).cloned())
49 .collect(),
50 },
51 )
52 })
53 })
54 .collect()
55}