bmp_protocol/
decoder.rs

1use crate::{
2    Error,
3    Result,
4    types::*,
5};
6
7use bgp_rs::Capabilities;
8use bytes::{
9    Buf,
10    buf::BufExt,
11    BytesMut
12};
13use hashbrown::HashMap;
14use tokio_util::codec::Decoder;
15
16use std::convert::TryInto;
17use std::net::IpAddr;
18
19// We need at least 5 bytes worth of the message in order to get the length
20const BMP_HEADER_LEN: usize = 5;
21
22/// Work out the common set of capabilities on a peering session
23fn common_capabilities(source: &Capabilities, other: &Capabilities) -> Capabilities {
24    // And (manually) build an intersection between the two
25    let mut negotiated = Capabilities::default();
26
27    negotiated.MP_BGP_SUPPORT = source
28        .MP_BGP_SUPPORT
29        .intersection(&other.MP_BGP_SUPPORT)
30        .copied()
31        .collect();
32    negotiated.ROUTE_REFRESH_SUPPORT = source.ROUTE_REFRESH_SUPPORT & other.ROUTE_REFRESH_SUPPORT;
33    negotiated.OUTBOUND_ROUTE_FILTERING_SUPPORT = source
34        .OUTBOUND_ROUTE_FILTERING_SUPPORT
35        .intersection(&other.OUTBOUND_ROUTE_FILTERING_SUPPORT)
36        .copied()
37        .collect();
38
39    // Attempt at a HashMap intersection. We can be a bit lax here because this isn't a real BGP implementation
40    // so we can not care too much about the values for now.
41    negotiated.EXTENDED_NEXT_HOP_ENCODING = source
42        .EXTENDED_NEXT_HOP_ENCODING
43        .iter()
44        // .filter(|((afi, safi), _)| other.EXTENDED_NEXT_HOP_ENCODING.contains_key(&(*afi, *safi)))
45        .map(|((afi, safi), nexthop)| ((*afi, *safi), *nexthop))
46        .collect();
47
48    negotiated.BGPSEC_SUPPORT = source.BGPSEC_SUPPORT & other.BGPSEC_SUPPORT;
49
50    negotiated.MULTIPLE_LABELS_SUPPORT = source
51        .MULTIPLE_LABELS_SUPPORT
52        .iter()
53        .filter(|((afi, safi), _)| other.MULTIPLE_LABELS_SUPPORT.contains_key(&(*afi, *safi)))
54        .map(|((afi, safi), val)| ((*afi, *safi), *val))
55        .collect();
56
57    negotiated.GRACEFUL_RESTART_SUPPORT = source
58        .GRACEFUL_RESTART_SUPPORT
59        .intersection(&other.GRACEFUL_RESTART_SUPPORT)
60        .copied()
61        .collect();
62    negotiated.FOUR_OCTET_ASN_SUPPORT =
63        source.FOUR_OCTET_ASN_SUPPORT & other.FOUR_OCTET_ASN_SUPPORT;
64
65    negotiated.ADD_PATH_SUPPORT = source
66        .ADD_PATH_SUPPORT
67        .iter()
68        .filter(|((afi, safi), _)| other.ADD_PATH_SUPPORT.contains_key(&(*afi, *safi)))
69        .map(|((afi, safi), val)| ((*afi, *safi), *val))
70        .collect();
71    negotiated.EXTENDED_PATH_NLRI_SUPPORT = !negotiated.ADD_PATH_SUPPORT.is_empty();
72
73    negotiated.ENHANCED_ROUTE_REFRESH_SUPPORT =
74        source.ENHANCED_ROUTE_REFRESH_SUPPORT & other.ENHANCED_ROUTE_REFRESH_SUPPORT;
75    negotiated.LONG_LIVED_GRACEFUL_RESTART =
76        source.LONG_LIVED_GRACEFUL_RESTART & other.LONG_LIVED_GRACEFUL_RESTART;
77
78    negotiated
79}
80
81#[derive(Clone, Debug)]
82enum DecoderState {
83    Head,
84    Data((u8, usize))
85}
86
87/// Decoder implementation for use with a FramedReader
88#[derive(Clone, Debug)]
89pub struct BmpDecoder {
90    client_capabilities: HashMap<IpAddr, Capabilities>,
91    state: DecoderState,
92}
93
94impl BmpDecoder {
95    /// Create a new instance of the Decoder
96    pub fn new() -> Self {
97        Self {
98            client_capabilities: HashMap::new(),
99            state: DecoderState::Head,
100        }
101    }
102
103    fn decode_head(&mut self, src: &mut BytesMut) -> Result<Option<(u8, usize)>> {
104        if src.len() < BMP_HEADER_LEN {
105            return Ok(None);
106        }
107
108        let version = src.get_u8();
109        let length = src.get_u32() as usize;
110        let remaining = length - BMP_HEADER_LEN;
111
112        src.reserve(remaining);
113        tracing::trace!(buf_capacity = %src.capacity());
114
115        Ok(Some((version, remaining)))
116    }
117
118    fn decode_data(&mut self, version: u8, length: usize, src: &mut BytesMut) -> Result<Option<BmpMessage>> {
119        // The BytesMut should already have the required capacity reserved so if we haven't read
120        // the entire message yet, just keep on reading!
121        if src.len() < length {
122            return Ok(None);
123        }
124
125        // Now we take the message while leaving anything else in the buffer
126        let mut buf = src.split_to(length);
127
128        // Now decode based on the MessageKind
129        let kind: MessageKind = buf.get_u8().try_into()?;
130        let message = match kind {
131            MessageKind::Initiation => {
132                let mut tlv = vec![];
133                while buf.remaining() > 0 {
134                    let kind = buf.get_u16();
135
136                    let info = match kind {
137                        x if x <= 2 => InformationTlv::decode(kind, &mut buf)?,
138                        _ => { break; }
139                    };
140
141                    tlv.push(info);
142                }
143
144                MessageData::Initiation(tlv)
145            },
146            MessageKind::PeerUp => {
147                let peer_header = PeerHeader::decode(&mut buf)?;
148                let message = PeerUp::decode(&peer_header.peer_flags, &mut buf)?;
149
150                // Record the speaker capabilities, we'll use these later
151                self.client_capabilities.entry(peer_header.peer_addr)
152                    .or_insert_with(|| {
153                        match (&message.sent_open, &message.recv_open) {
154                            (Some(s), Some(r)) => {
155                                let local_caps = Capabilities::from_parameters(s.parameters.clone());
156                                let remote_caps = Capabilities::from_parameters(r.parameters.clone());
157
158                                let mut caps = common_capabilities(&local_caps, &remote_caps);
159
160                                // Use the BMP header val, not the negotiated val
161                                if !peer_header.peer_flags.A { caps.FOUR_OCTET_ASN_SUPPORT = true; }
162                                caps
163                            },
164                            _ => {
165                                tracing::warn!("Missing BGP OPENs (local: {} remote: {}", message.local_addr, peer_header.peer_addr);
166
167                                let mut caps = Capabilities::default();
168                                if !peer_header.peer_flags.A { caps.FOUR_OCTET_ASN_SUPPORT = true; }
169
170                                caps
171                            }
172                        }
173                    });
174                    // .or_insert_with(|| Capabilities::common(&message.sent_open, &message.recv_open).expect("missing capabilities"));
175
176                MessageData::PeerUp((peer_header, message))
177            },
178            MessageKind::PeerDown => {
179                // Make sure to clean up self.capabilities
180                let peer_header = PeerHeader::decode(&mut buf)?;
181                let message = PeerDown::decode(&mut buf)?;
182
183                self.client_capabilities.remove(&peer_header.peer_addr);
184
185                MessageData::PeerDown((peer_header, message))
186            },
187            MessageKind::RouteMonitoring => {
188                let peer_header = PeerHeader::decode(&mut buf)?;
189                let capabilities = self.client_capabilities.get(&peer_header.peer_addr)
190                    // .ok_or_else(|| format_err!("No capabilities found for neighbor {}", peer_header.peer_addr))?;
191                    .ok_or_else(|| Error::decode(&format!("No capabilities found for neighbor {}", peer_header.peer_addr)))?;
192
193                let mut rdr = buf.reader();
194                let header = bgp_rs::Header::parse(&mut rdr)?;
195                let update = bgp_rs::Update::parse(&header, &mut rdr, &capabilities)?;
196
197                MessageData::RouteMonitoring((peer_header, update))
198            },
199            _ => MessageData::Unimplemented
200        };
201
202        Ok(
203            Some(BmpMessage { version, kind, message })
204        )
205    }
206}
207
208impl Decoder for BmpDecoder {
209    type Item = BmpMessage;
210    type Error = std::io::Error;
211
212    fn decode(&mut self, src: &mut BytesMut) -> std::io::Result<Option<BmpMessage>> {
213        let (version, length) = match self.state {
214            DecoderState::Head => {
215                match self.decode_head(src)? {
216                    Some((ver, len)) => {
217                        self.state = DecoderState::Data((ver, len));
218                        (ver, len)
219                    },
220                    None => return Ok(None)
221                }
222            },
223            DecoderState::Data((ver, len)) => (ver, len)
224        };
225
226        match self.decode_data(version, length, src)? {
227            Some(message) => {
228                self.state = DecoderState::Head;
229                src.reserve(BMP_HEADER_LEN);
230
231                Ok(Some(message))
232            },
233            None => Ok(None)
234        }
235    }
236}