1use crate::{
2 Error,
3 Result,
4 types::*,
5};
6
7use bgp_rs::Capabilities;
8use bytes::{
9 Buf,
10 buf::BufExt,
11 BytesMut
12};
13use hashbrown::HashMap;
14use tokio_util::codec::Decoder;
15
16use std::convert::TryInto;
17use std::net::IpAddr;
18
19const BMP_HEADER_LEN: usize = 5;
21
22fn common_capabilities(source: &Capabilities, other: &Capabilities) -> Capabilities {
24 let mut negotiated = Capabilities::default();
26
27 negotiated.MP_BGP_SUPPORT = source
28 .MP_BGP_SUPPORT
29 .intersection(&other.MP_BGP_SUPPORT)
30 .copied()
31 .collect();
32 negotiated.ROUTE_REFRESH_SUPPORT = source.ROUTE_REFRESH_SUPPORT & other.ROUTE_REFRESH_SUPPORT;
33 negotiated.OUTBOUND_ROUTE_FILTERING_SUPPORT = source
34 .OUTBOUND_ROUTE_FILTERING_SUPPORT
35 .intersection(&other.OUTBOUND_ROUTE_FILTERING_SUPPORT)
36 .copied()
37 .collect();
38
39 negotiated.EXTENDED_NEXT_HOP_ENCODING = source
42 .EXTENDED_NEXT_HOP_ENCODING
43 .iter()
44 .map(|((afi, safi), nexthop)| ((*afi, *safi), *nexthop))
46 .collect();
47
48 negotiated.BGPSEC_SUPPORT = source.BGPSEC_SUPPORT & other.BGPSEC_SUPPORT;
49
50 negotiated.MULTIPLE_LABELS_SUPPORT = source
51 .MULTIPLE_LABELS_SUPPORT
52 .iter()
53 .filter(|((afi, safi), _)| other.MULTIPLE_LABELS_SUPPORT.contains_key(&(*afi, *safi)))
54 .map(|((afi, safi), val)| ((*afi, *safi), *val))
55 .collect();
56
57 negotiated.GRACEFUL_RESTART_SUPPORT = source
58 .GRACEFUL_RESTART_SUPPORT
59 .intersection(&other.GRACEFUL_RESTART_SUPPORT)
60 .copied()
61 .collect();
62 negotiated.FOUR_OCTET_ASN_SUPPORT =
63 source.FOUR_OCTET_ASN_SUPPORT & other.FOUR_OCTET_ASN_SUPPORT;
64
65 negotiated.ADD_PATH_SUPPORT = source
66 .ADD_PATH_SUPPORT
67 .iter()
68 .filter(|((afi, safi), _)| other.ADD_PATH_SUPPORT.contains_key(&(*afi, *safi)))
69 .map(|((afi, safi), val)| ((*afi, *safi), *val))
70 .collect();
71 negotiated.EXTENDED_PATH_NLRI_SUPPORT = !negotiated.ADD_PATH_SUPPORT.is_empty();
72
73 negotiated.ENHANCED_ROUTE_REFRESH_SUPPORT =
74 source.ENHANCED_ROUTE_REFRESH_SUPPORT & other.ENHANCED_ROUTE_REFRESH_SUPPORT;
75 negotiated.LONG_LIVED_GRACEFUL_RESTART =
76 source.LONG_LIVED_GRACEFUL_RESTART & other.LONG_LIVED_GRACEFUL_RESTART;
77
78 negotiated
79}
80
81#[derive(Clone, Debug)]
82enum DecoderState {
83 Head,
84 Data((u8, usize))
85}
86
87#[derive(Clone, Debug)]
89pub struct BmpDecoder {
90 client_capabilities: HashMap<IpAddr, Capabilities>,
91 state: DecoderState,
92}
93
94impl BmpDecoder {
95 pub fn new() -> Self {
97 Self {
98 client_capabilities: HashMap::new(),
99 state: DecoderState::Head,
100 }
101 }
102
103 fn decode_head(&mut self, src: &mut BytesMut) -> Result<Option<(u8, usize)>> {
104 if src.len() < BMP_HEADER_LEN {
105 return Ok(None);
106 }
107
108 let version = src.get_u8();
109 let length = src.get_u32() as usize;
110 let remaining = length - BMP_HEADER_LEN;
111
112 src.reserve(remaining);
113 tracing::trace!(buf_capacity = %src.capacity());
114
115 Ok(Some((version, remaining)))
116 }
117
118 fn decode_data(&mut self, version: u8, length: usize, src: &mut BytesMut) -> Result<Option<BmpMessage>> {
119 if src.len() < length {
122 return Ok(None);
123 }
124
125 let mut buf = src.split_to(length);
127
128 let kind: MessageKind = buf.get_u8().try_into()?;
130 let message = match kind {
131 MessageKind::Initiation => {
132 let mut tlv = vec![];
133 while buf.remaining() > 0 {
134 let kind = buf.get_u16();
135
136 let info = match kind {
137 x if x <= 2 => InformationTlv::decode(kind, &mut buf)?,
138 _ => { break; }
139 };
140
141 tlv.push(info);
142 }
143
144 MessageData::Initiation(tlv)
145 },
146 MessageKind::PeerUp => {
147 let peer_header = PeerHeader::decode(&mut buf)?;
148 let message = PeerUp::decode(&peer_header.peer_flags, &mut buf)?;
149
150 self.client_capabilities.entry(peer_header.peer_addr)
152 .or_insert_with(|| {
153 match (&message.sent_open, &message.recv_open) {
154 (Some(s), Some(r)) => {
155 let local_caps = Capabilities::from_parameters(s.parameters.clone());
156 let remote_caps = Capabilities::from_parameters(r.parameters.clone());
157
158 let mut caps = common_capabilities(&local_caps, &remote_caps);
159
160 if !peer_header.peer_flags.A { caps.FOUR_OCTET_ASN_SUPPORT = true; }
162 caps
163 },
164 _ => {
165 tracing::warn!("Missing BGP OPENs (local: {} remote: {}", message.local_addr, peer_header.peer_addr);
166
167 let mut caps = Capabilities::default();
168 if !peer_header.peer_flags.A { caps.FOUR_OCTET_ASN_SUPPORT = true; }
169
170 caps
171 }
172 }
173 });
174 MessageData::PeerUp((peer_header, message))
177 },
178 MessageKind::PeerDown => {
179 let peer_header = PeerHeader::decode(&mut buf)?;
181 let message = PeerDown::decode(&mut buf)?;
182
183 self.client_capabilities.remove(&peer_header.peer_addr);
184
185 MessageData::PeerDown((peer_header, message))
186 },
187 MessageKind::RouteMonitoring => {
188 let peer_header = PeerHeader::decode(&mut buf)?;
189 let capabilities = self.client_capabilities.get(&peer_header.peer_addr)
190 .ok_or_else(|| Error::decode(&format!("No capabilities found for neighbor {}", peer_header.peer_addr)))?;
192
193 let mut rdr = buf.reader();
194 let header = bgp_rs::Header::parse(&mut rdr)?;
195 let update = bgp_rs::Update::parse(&header, &mut rdr, &capabilities)?;
196
197 MessageData::RouteMonitoring((peer_header, update))
198 },
199 _ => MessageData::Unimplemented
200 };
201
202 Ok(
203 Some(BmpMessage { version, kind, message })
204 )
205 }
206}
207
208impl Decoder for BmpDecoder {
209 type Item = BmpMessage;
210 type Error = std::io::Error;
211
212 fn decode(&mut self, src: &mut BytesMut) -> std::io::Result<Option<BmpMessage>> {
213 let (version, length) = match self.state {
214 DecoderState::Head => {
215 match self.decode_head(src)? {
216 Some((ver, len)) => {
217 self.state = DecoderState::Data((ver, len));
218 (ver, len)
219 },
220 None => return Ok(None)
221 }
222 },
223 DecoderState::Data((ver, len)) => (ver, len)
224 };
225
226 match self.decode_data(version, length, src)? {
227 Some(message) => {
228 self.state = DecoderState::Head;
229 src.reserve(BMP_HEADER_LEN);
230
231 Ok(Some(message))
232 },
233 None => Ok(None)
234 }
235 }
236}