birdc/
connection.rs

1//! Module that deals with connection and protocol related logic.
2//!
3//! Refer to documentation of [Connection] for more details.
4
5use std::{
6    collections::VecDeque,
7    io::{ErrorKind, Read, Write},
8    path::Path,
9};
10
11use crate::{
12    Error, Interface, InterfaceAddress, InterfaceProperties, InterfaceSummary, InvalidTokenError,
13    Message, Protocol, ProtocolDetail, Result, ShowInterfacesMessage, ShowProtocolDetailsMessage,
14    ShowStatusMessage,
15};
16
17/// An active connection, on which requests can be executed, and responses
18/// received.
19///
20/// The request/response mechanism is serial, so if a request is made
21/// before the response from the previous one has been fully received, you'll
22/// get a [Error::OperationInProgress] error.
23pub struct Connection {
24    stream: tokio::net::UnixStream,
25    unparsed_bytes: Vec<u8>,
26    unsent_messages: VecDeque<Message>,
27    request_in_progress: bool,
28}
29
30impl Connection {
31    /// Open a new connection to this `unix_socket`, and consumes the
32    /// introductory welcome message before returning the [Connection]
33    pub(crate) async fn new<P: AsRef<Path>>(unix_socket: P) -> Result<Self> {
34        // connect to the unix socket
35        let stream = tokio::net::UnixStream::connect(unix_socket).await?;
36
37        let mut connection = Connection {
38            stream,
39            unparsed_bytes: Vec::with_capacity(2 * READ_FRAME_SIZE),
40            unsent_messages: VecDeque::with_capacity(20),
41            // we mark this true because of the initial greeting
42            request_in_progress: true,
43        };
44
45        // process greeting and return
46        if let Message::Welcome(ref greeting) = connection.next_message().await? {
47            log::trace!("received greeting {greeting}");
48            // we need to do this because the message processor automatically adds an Ok
49            if let Message::Ok = connection.next_message().await? {
50                log::trace!("handshake completed. connection active");
51                connection.allow_new_requests();
52                return Ok(connection);
53            }
54        }
55        Err(Error::InvalidToken(InvalidTokenError::Other(
56            "did not find greeting".into(),
57        )))
58    }
59
60    /// Mark current request/response session as completed, so that new requests can
61    /// be made on this connection.
62    #[inline]
63    fn allow_new_requests(&mut self) {
64        self.request_in_progress = false;
65    }
66
67    /// Sends a request to the server and gets a vec of response messages. The
68    /// terminating [Message::Ok] is not included.
69    pub async fn send_request(&mut self, request: &str) -> Result<Vec<Message>> {
70        // if there's already a request in progress, we shouldn't be sending
71        // another request as we won't be able to differentiate the response
72        if self.request_in_progress {
73            return Err(Error::OperationInProgress);
74        }
75
76        // make sure we've cleared pending bytes & messages
77        self.unparsed_bytes.clear();
78        self.unsent_messages.clear();
79
80        // send the request
81        let request = if request.ends_with('\n') {
82            request.to_owned()
83        } else {
84            format!("{}\n", &request)
85        };
86        let mut result: Vec<Message> = Vec::new();
87        self.write_to_server(&request).await?;
88        self.request_in_progress = true; // mark as operation in progress
89
90        // receive all response messages
91        loop {
92            let message = self.next_message().await?;
93            if let Message::Ok = message {
94                self.allow_new_requests();
95                return Ok(result);
96            } else {
97                result.push(message);
98            }
99        }
100    }
101
102    /// Sends a `show interfaces summary` request and returns the parsed response as a
103    /// list of [InterfaceSummary] entries, one each for an interface.
104    pub async fn show_interfaces_summary(&mut self) -> Result<Vec<InterfaceSummary>> {
105        let messages = self.send_request("show interfaces summary").await?;
106
107        // we ignore the 2005 message, and focus only on 1005
108        for message in &messages {
109            // if we get a 1005, we process it and return it
110            if let Message::InterfaceSummary(_) = message {
111                return if let Some(ifcs) = InterfaceSummary::from_enum(message) {
112                    Ok(ifcs)
113                } else {
114                    Err(Error::ParseError(messages))
115                };
116            }
117        }
118
119        // if we didn't encounter any 1005, we return a ParseError
120        Err(Error::ParseError(messages))
121    }
122
123    /// Sends a `show interfaces` request and returns the parsed response as a
124    /// list of [ShowInterfacesMessage] entries, one each for an interface.
125    pub async fn show_interfaces(&mut self) -> Result<Vec<ShowInterfacesMessage>> {
126        let messages = self.send_request("show interfaces").await?;
127        handle_show_interfaces(messages)
128    }
129
130    /// Sends a `show protocols [<pattern>]` request and returns the parsed response as a
131    /// list of [InterfaceSummary] entries, one for each protocol.
132    ///
133    /// If `pattern` is specified, results of only those protocols is returned, which
134    /// match the pattern.
135    pub async fn show_protocols(&mut self, pattern: Option<&str>) -> Result<Vec<Protocol>> {
136        let cmd = if let Some(pattern) = pattern {
137            format!("show protocols \"{pattern}\"")
138        } else {
139            "show protocols".into()
140        };
141        let messages = self.send_request(&cmd).await?;
142        handle_show_protocols(messages)
143    }
144
145    /// Sends a `show protocols all [<pattern>]` request and returns the parsed response as a
146    /// list of [ShowProtocolDetailsMessage] entries, one for each protocol instance.
147    ///
148    /// If `pattern` is specified, results of only those protocols is returned, which
149    /// match the pattern.
150    pub async fn show_protocols_details(
151        &mut self,
152        pattern: Option<&str>,
153    ) -> Result<Vec<ShowProtocolDetailsMessage>> {
154        let cmd = if let Some(pattern) = pattern {
155            format!("show protocols all \"{pattern}\"")
156        } else {
157            "show protocols all".into()
158        };
159        let messages = self.send_request(&cmd).await?;
160        handle_show_protocols_details(messages)
161    }
162
163    /// Sends a `show status` request, and returns a semantically parsed response
164    /// in the form of [ShowStatusMessage]
165    pub async fn show_status(&mut self) -> Result<ShowStatusMessage> {
166        let messages = self.send_request("show status").await?;
167
168        match ShowStatusMessage::from_messages(&messages) {
169            Some(ssm) => Ok(ssm),
170            None => Err(Error::ParseError(messages)),
171        }
172    }
173
174    /// Reads a full [Message] from the server, and returns it
175    async fn next_message(&mut self) -> Result<Message> {
176        // if we have pending messages, return the first one
177        if let Some(pending_message) = self.unsent_messages.pop_front() {
178            return Ok(pending_message);
179        }
180
181        // we are here because we don't have sufficient data in unparsed_bytes
182        // to create a new message, so we have to fetch more
183        self.fetch_new_messages().await?;
184        if let Some(new_message) = self.unsent_messages.pop_front() {
185            Ok(new_message)
186        } else {
187            // if we didn't get any message, there's something wrong
188            Err(Error::eof("was expecting more messages"))
189        }
190    }
191
192    /// Writes `request` to the server, returning only after it has been written
193    /// fully.
194    async fn write_to_server(&self, request: &str) -> Result<()> {
195        let data = request.as_bytes();
196        let total_size = data.len();
197        let mut written_size = 0;
198        loop {
199            self.stream.writable().await?;
200            match self.stream.try_write(data) {
201                Ok(n) => {
202                    written_size += n;
203                    if written_size >= total_size {
204                        return Ok(());
205                    }
206                }
207                Err(err) => {
208                    if err.kind() != ErrorKind::WouldBlock {
209                        return Err(Error::from(err));
210                    }
211                }
212            }
213        }
214    }
215
216    /// Fetches and add news messages to the queue.
217    #[inline]
218    async fn fetch_new_messages(&mut self) -> Result<()> {
219        loop {
220            self.stream.readable().await?;
221            let mut frame = [0_u8; READ_FRAME_SIZE];
222            match self.stream.try_read(&mut frame) {
223                Ok(0) => {
224                    return Err(Error::eof("premature EOF"));
225                }
226                Ok(count) => {
227                    if enqueue_messages(
228                        &frame[..count],
229                        &mut self.unparsed_bytes,
230                        &mut self.unsent_messages,
231                    )? == 0
232                    {
233                        // we continue to fetch more if amount of data
234                        // was insufficient to parse response
235                        continue;
236                    } else {
237                        return Ok(());
238                    }
239                }
240                Err(err) => {
241                    if err.kind() != ErrorKind::WouldBlock {
242                        return Err(Error::IoError(err));
243                    }
244                }
245            }
246        }
247    }
248}
249
250/// A non-async version of [Connection]
251pub struct SyncConnection {
252    stream: std::os::unix::net::UnixStream,
253    unparsed_bytes: Vec<u8>,
254    unsent_messages: VecDeque<Message>,
255    request_in_progress: bool,
256}
257
258impl SyncConnection {
259    /// Open a new connection to this `unix_socket`, and consumes the
260    /// introductory welcome message before returning the [Connection]
261    pub(crate) fn new<P: AsRef<Path>>(unix_socket: P) -> Result<Self> {
262        let stream = std::os::unix::net::UnixStream::connect(unix_socket)?;
263
264        let mut connection = SyncConnection {
265            stream,
266            unparsed_bytes: Vec::with_capacity(2 * READ_FRAME_SIZE),
267            unsent_messages: VecDeque::with_capacity(20),
268            request_in_progress: true,
269        };
270
271        if let Message::Welcome(ref greeting) = connection.next_message()? {
272            log::trace!("received greeting {greeting}");
273            if let Message::Ok = connection.next_message()? {
274                log::trace!("handshake completed. connection active");
275                connection.allow_new_requests();
276                return Ok(connection);
277            }
278        }
279        Err(Error::InvalidToken(InvalidTokenError::Other(
280            "did not find greeting".into(),
281        )))
282    }
283
284    /// Mark current request/response session as completed, so that new requests can
285    /// be made on this connection.
286    #[inline]
287    fn allow_new_requests(&mut self) {
288        self.request_in_progress = false;
289    }
290
291    /// Sends a request to the server and gets a vec of response messages. The
292    /// terminating [Message::Ok] is not included.
293    pub fn send_request(&mut self, request: &str) -> Result<Vec<Message>> {
294        if self.request_in_progress {
295            return Err(Error::OperationInProgress);
296        }
297
298        self.unparsed_bytes.clear();
299        self.unsent_messages.clear();
300
301        let request = if request.ends_with('\n') {
302            request.to_owned()
303        } else {
304            format!("{}\n", &request)
305        };
306        let mut result: Vec<Message> = Vec::new();
307        self.write_to_server(&request)?;
308        self.request_in_progress = true;
309
310        loop {
311            let message = self.next_message()?;
312            if let Message::Ok = message {
313                self.allow_new_requests();
314                return Ok(result);
315            } else {
316                result.push(message);
317            }
318        }
319    }
320
321    /// Sends a `show interfaces summary` request and returns the parsed response as a
322    /// list of [InterfaceSummary] entries, one each for an interface.
323    pub fn show_interfaces_summary(&mut self) -> Result<Vec<InterfaceSummary>> {
324        let messages = self.send_request("show interfaces summary")?;
325
326        for message in &messages {
327            if let Message::InterfaceSummary(_) = message {
328                return if let Some(ifcs) = InterfaceSummary::from_enum(message) {
329                    Ok(ifcs)
330                } else {
331                    Err(Error::ParseError(messages))
332                };
333            }
334        }
335
336        Err(Error::ParseError(messages))
337    }
338
339    /// Sends a `show interfaces` request and returns the parsed response as a
340    /// list of [ShowInterfacesMessage] entries, one each for an interface.
341    pub fn show_interfaces(&mut self) -> Result<Vec<ShowInterfacesMessage>> {
342        let messages = self.send_request("show interfaces")?;
343        handle_show_interfaces(messages)
344    }
345
346    /// Sends a `show protocols [<pattern>]` request and returns the parsed response as a
347    /// list of [InterfaceSummary] entries, one for each protocol.
348    ///
349    /// If `pattern` is specified, results of only those protocols is returned, which
350    /// match the pattern.
351    pub fn show_protocols(&mut self, pattern: Option<&str>) -> Result<Vec<Protocol>> {
352        let cmd = if let Some(pattern) = pattern {
353            format!("show protocols \"{pattern}\"")
354        } else {
355            "show protocols".into()
356        };
357        let messages = self.send_request(&cmd)?;
358        handle_show_protocols(messages)
359    }
360
361    /// Sends a `show protocols all [<pattern>]` request and returns the parsed response as a
362    /// list of [ShowProtocolDetailsMessage] entries, one for each protocol instance.
363    ///
364    /// If `pattern` is specified, results of only those protocols is returned, which
365    /// match the pattern.
366    pub fn show_protocols_details(
367        &mut self,
368        pattern: Option<&str>,
369    ) -> Result<Vec<ShowProtocolDetailsMessage>> {
370        let cmd = if let Some(pattern) = pattern {
371            format!("show protocols all \"{pattern}\"")
372        } else {
373            "show protocols all".into()
374        };
375        let messages = self.send_request(&cmd)?;
376        handle_show_protocols_details(messages)
377    }
378
379    /// Sends a `show status` request, and returns a semantically parsed response
380    /// in the form of [ShowStatusMessage]
381    pub fn show_status(&mut self) -> Result<ShowStatusMessage> {
382        let messages = self.send_request("show status")?;
383
384        match ShowStatusMessage::from_messages(&messages) {
385            Some(ssm) => Ok(ssm),
386            None => Err(Error::ParseError(messages)),
387        }
388    }
389
390    /// Reads a full [Message] from the server, and returns it
391    fn next_message(&mut self) -> Result<Message> {
392        if let Some(pending_message) = self.unsent_messages.pop_front() {
393            return Ok(pending_message);
394        }
395
396        self.fetch_new_messages()?;
397        if let Some(new_message) = self.unsent_messages.pop_front() {
398            Ok(new_message)
399        } else {
400            Err(Error::eof("was expecting more messages"))
401        }
402    }
403
404    /// Writes `request` to the server, returning only after it has been written
405    /// fully.
406    fn write_to_server(&mut self, request: &str) -> Result<()> {
407        let data = request.as_bytes();
408        let total_size = data.len();
409        let mut written_size = 0;
410        loop {
411            match self.stream.write(data) {
412                Ok(n) => {
413                    written_size += n;
414                    if written_size >= total_size {
415                        return Ok(());
416                    }
417                }
418                Err(err) => {
419                    if err.kind() != ErrorKind::WouldBlock {
420                        return Err(Error::from(err));
421                    }
422                }
423            }
424        }
425    }
426
427    /// Fetches and add news messages to the queue.
428    #[inline]
429    fn fetch_new_messages(&mut self) -> Result<()> {
430        loop {
431            let mut frame = [0_u8; READ_FRAME_SIZE];
432            match self.stream.read(&mut frame) {
433                Ok(0) => {
434                    return Err(Error::eof("premature EOF"));
435                }
436                Ok(count) => {
437                    if enqueue_messages(
438                        &frame[..count],
439                        &mut self.unparsed_bytes,
440                        &mut self.unsent_messages,
441                    )? == 0
442                    {
443                        continue;
444                    } else {
445                        return Ok(());
446                    }
447                }
448                Err(err) => {
449                    if err.kind() != ErrorKind::WouldBlock {
450                        return Err(Error::IoError(err));
451                    }
452                }
453            }
454        }
455    }
456}
457
458fn handle_show_interfaces(messages: Vec<Message>) -> Result<Vec<ShowInterfacesMessage>> {
459    let mut result = vec![];
460
461    // we expect messages to show up as a series of triplets: 1001, 1004 and 1003
462    let mut idx = 0;
463    loop {
464        // each iteration here means fully going through all of 1001, 1004 and 1003
465
466        // if we're already at end, return
467        if idx >= messages.len() {
468            return Ok(result);
469        }
470
471        // start processing
472        let first_msg = &messages[idx];
473        idx += 1;
474
475        // process only if we find the first entry to be a 1001
476        if let Some(msg_1001) = Interface::from_enum(first_msg) {
477            // get the position of the next 1001
478            let next_1001_idx = messages[idx..]
479                .iter()
480                .position(|x| matches!(x, Message::InterfaceList(_)))
481                .unwrap_or(messages.len() - idx)
482                + idx;
483            let delta = next_1001_idx - idx;
484            if delta == 0 || delta > 2 {
485                log::error!(
486                    "conn: parse failed: a 1001 entry without (or more than one) 1003/1004",
487                );
488                return Err(Error::ParseError(messages));
489            }
490            let mut msg_1004: Option<InterfaceProperties> = None;
491            let mut msg_1003: Option<Vec<InterfaceAddress>> = None;
492            while idx < next_1001_idx {
493                let cur_msg = &messages[idx];
494                idx += 1;
495                match cur_msg {
496                    Message::InterfaceFlags(_) => {
497                        if let Some(props) = InterfaceProperties::from_enum(cur_msg) {
498                            msg_1004 = Some(props);
499                        } else {
500                            return Err(Error::ParseError(messages));
501                        }
502                    }
503                    Message::InterfaceAddress(_) => {
504                        if let Some(addrs) = InterfaceAddress::from_enum(cur_msg) {
505                            msg_1003 = Some(addrs);
506                        } else {
507                            return Err(Error::ParseError(messages));
508                        }
509                    }
510                    _ => {
511                        log::error!(
512                            "conn: parse failed: found invalid code {}",
513                            messages[idx].code()
514                        );
515                        return Err(Error::ParseError(messages));
516                    }
517                }
518            }
519            if let Some(msg_1004) = msg_1004 {
520                result.push(ShowInterfacesMessage {
521                    interface: msg_1001,
522                    properties: msg_1004,
523                    addresses: msg_1003.unwrap_or_default(),
524                });
525            } else {
526                log::error!("conn: parse failed: found a 1001 without a 1004");
527                return Err(Error::ParseError(messages));
528            }
529        } else {
530            return Err(Error::ParseError(messages));
531        }
532    }
533}
534
535fn handle_show_protocols(messages: Vec<Message>) -> Result<Vec<Protocol>> {
536    // we ignore the 2002 message, and focus only on 1005
537    for message in &messages {
538        // if we get a 1002, we process it and return it
539        if let Message::ProtocolList(_) = message {
540            return if let Some(protocols) = Protocol::from_enum(message) {
541                Ok(protocols)
542            } else {
543                Err(Error::ParseError(messages))
544            };
545        }
546    }
547
548    // if we didn't encounter any 1002, we return a ParseError
549    Err(Error::ParseError(messages))
550}
551
552fn handle_show_protocols_details(
553    messages: Vec<Message>,
554) -> Result<Vec<ShowProtocolDetailsMessage>> {
555    let mut result = vec![];
556    // we expect messages to show up as a series of doublets: 1002 and 1006
557    if let Some(mut idx) = messages
558        .iter()
559        .position(|x| matches!(x, Message::ProtocolList(_)))
560    {
561        while idx < messages.len() {
562            // each iteration here means fully going through the pair of 1002 and 1006
563
564            if let Some(protocol) = Protocol::from_enum(&messages[idx]) {
565                // move index to the next message
566                idx += 1;
567                // if we have a valid 1002 ...
568                if let Some(protocol) = protocol.first() {
569                    // ... check for 1006
570                    if idx == messages.len()
571                        || !matches!(messages[idx], Message::ProtocolDetails(_))
572                    {
573                        // if we're already at the end, or if there's no 1006 after the 1002 we saw
574                        // just a step before, we push the current 1002 without any 1006, and continue
575                        result.push(ShowProtocolDetailsMessage {
576                            protocol: protocol.clone(),
577                            detail: None,
578                        });
579                        continue;
580                    }
581                    // looks like we have a valid 1006, so let's process it
582                    if let Some(detail) = ProtocolDetail::from_enum(&messages[idx]) {
583                        // looks like we got a valid 1006
584                        idx += 1;
585                        result.push(ShowProtocolDetailsMessage {
586                            protocol: protocol.clone(),
587                            detail: Some(detail),
588                        });
589                    } else {
590                        log::error!("conn: failed to parse 1006 message");
591                        return Err(Error::ParseError(messages));
592                    }
593                }
594            } else {
595                log::error!("conn: failed to parse 1002 message: {:?}", messages[idx]);
596                return Err(Error::ParseError(messages));
597            }
598        }
599
600        Ok(result)
601    } else {
602        // No 1002 entries, so empty result
603        Ok(Vec::new())
604    }
605}
606
607/// Process raw bytes to parse and enqueue messages. On success returns the
608/// number of messages enqueued.
609///
610/// If we have pending unparsed bytes from previous iterations, we create a
611/// new buffer that combines the old one with the new `frame`, and then
612/// processes it.
613///
614/// However, if we don't have any pending unparsed bytes, then it would be
615/// an overhead to do so, so we just process the frame directly.
616///
617/// In both cases, pending bytes from this iteration are added to
618/// `unparsed_bytes`
619#[inline]
620fn enqueue_messages(
621    frame: &[u8],
622    unparsed_bytes: &mut Vec<u8>,
623    unsent_messages: &mut VecDeque<Message>,
624) -> Result<usize> {
625    let num_unparsed = unparsed_bytes.len();
626    let has_unparsed = num_unparsed > 0;
627    if has_unparsed {
628        // if we had previously unparsed bytes, we use them in combination with
629        // the new frame
630        let mut new_vec: Vec<u8> = Vec::with_capacity(num_unparsed + frame.len());
631        new_vec.extend_from_slice(unparsed_bytes);
632        new_vec.extend_from_slice(frame);
633        enqueue_messages_from_buffer(&new_vec, unparsed_bytes, unsent_messages)
634    } else {
635        // if we didn't have any previously unparsed bytes, we can process this
636        // frame directly, gaining a tiny bit of efficiency. This helps in dealing
637        // with most messages that will tend to be quite small.
638        enqueue_messages_from_buffer(frame, unparsed_bytes, unsent_messages)
639    }
640}
641
642/// Processes raw data to parse and enqeueue Messages.
643///
644/// The logic is straighforward, even if cumbersome to look at. We run a loop, where
645/// at each iteration, we process a new line. In each line, we encounter one of the
646/// following scenarios (xxxx is a 4 digit code):
647/// 1. `xxxx<space><content>` - this is the last line in this response
648/// 2. `xxxx<minus><content>` - this is NOT the last line in this response
649/// 3. `<space><content>` - same as (2) but the xxxx code is implicitly = previous one
650///
651/// More details about the protocol can be found [here](https://gitlab.nic.cz/labs/bird/-/blob/master/nest/cli.c)
652///
653/// While processing each line, we can return an `Ok(0)` to indicate we need more
654/// data ([Connection::fetch_new_messages] takes care of that).
655#[inline]
656fn enqueue_messages_from_buffer(
657    buffer: &[u8],
658    unparsed_bytes: &mut Vec<u8>,
659    unsent_messages: &mut VecDeque<Message>,
660) -> Result<usize> {
661    let bsize = buffer.len();
662    let mut num_messages = 0;
663    let mut pos: usize = 0;
664    let mut code: u32 = 0;
665    let mut msg_start_pos = 0;
666    let mut message_size: usize = 0;
667    let mut last_msg_added_epos = 0;
668
669    // process things line by line. each iteration of this loop constitutes
670    // a new line
671    loop {
672        let line_start_pos = pos;
673        log::trace!("conn: checking if we can start processing a new line");
674        // break or ask for more data if we're at the end, but expected to parse
675        if pos >= bsize {
676            log::trace!("  need more data: position is larger than buffer size");
677            break;
678        }
679
680        // if we don't have visibility into the next newline, break or ask
681        // for more data
682        let nl_pos: usize = match buffer[pos..].iter().position(|it| *it == b'\n') {
683            Some(it) => pos + it,
684            None => {
685                log::trace!("  need more data: buffer is not terminated by newline");
686                break;
687            }
688        };
689        let next_line_pos = nl_pos + 1;
690
691        log::trace!(
692            "conn: processing line: {}",
693            String::from_utf8_lossy(&buffer[pos..nl_pos])
694        );
695
696        if buffer[pos] == b' ' {
697            log::trace!("  no code present, we're a continuation of prev line");
698            pos += 1; // we're now at start of data in this line
699            message_size += nl_pos - pos + 1; // +1 for newline
700        } else {
701            log::trace!("  line has a code, need to check if same as prev or not");
702            // the line does not start with a space, so we MUST see a code
703            // and a continuation/final marker
704            if pos + 5 >= bsize {
705                log::trace!("  need more data: trying to parse bird code but buffer does not contain all of it");
706                break;
707            }
708            let new_code = parse_code(&buffer[pos..(pos + 4)])?;
709            let separator = buffer[pos + 4];
710            log::trace!(
711                "  encountered code {} and separator '{}'",
712                new_code,
713                separator as char
714            );
715            let is_last = match separator {
716                b' ' => true,
717                b'-' => false,
718                _ => {
719                    return Err(Error::InvalidToken(InvalidTokenError::Other(format!(
720                        "unknown separator {separator} after code {new_code}"
721                    ))));
722                }
723            };
724            pos += 5; // we're now at the start of data in this line
725
726            let mut ok_added = false;
727            if is_last {
728                // if this is the last line
729                if new_code == code {
730                    log::trace!(
731                        "  determined to be the last line, but has same code as before {code}"
732                    );
733                    // treat it as continuation of the previous message
734                    message_size += nl_pos - pos + 1;
735                    let message = parse_message(code, buffer, msg_start_pos, message_size)?;
736                    log::trace!("  pushing last message {message:?}");
737                    unsent_messages.push_back(message);
738                    num_messages += 1;
739                    last_msg_added_epos = nl_pos + 1;
740                } else {
741                    log::trace!("  determined to be the last line, has new code  {code}");
742                    // treat this as a new message
743                    // we first push the prev message, if present
744                    if message_size > 0 {
745                        let message = parse_message(code, buffer, msg_start_pos, message_size)?;
746                        log::trace!("  pushing prev to last message {message:?}");
747                        unsent_messages.push_back(message);
748                        num_messages += 1;
749                        // last_msg_added_epos = nl_pos + 1; // not needed as we do this at the end anyway
750                    }
751                    // now we process the new message
752                    code = new_code;
753                    msg_start_pos = pos;
754                    let message = parse_message(code, buffer, msg_start_pos, message_size)?;
755                    log::trace!("  pushing new message {message:?}");
756                    if let Message::Ok = message {
757                        ok_added = true;
758                    }
759                    unsent_messages.push_back(message);
760                    num_messages += 1;
761                    last_msg_added_epos = nl_pos + 1;
762                }
763                if !ok_added {
764                    unsent_messages.push_back(Message::Ok);
765                }
766                break;
767            } else {
768                // if this is not the last line
769                // if this line is a continuation of the previous one
770                if new_code == code {
771                    log::trace!("  not the last line, continuing from prev line");
772                    // we just mark this line as extension of previous one
773                    message_size += nl_pos - pos + 1;
774                } else {
775                    log::trace!("  not the last line, but new code");
776                    // treat this as a new message
777                    // we first push the prev message, if present
778                    if message_size > 0 {
779                        let message = parse_message(code, buffer, msg_start_pos, message_size)?;
780                        log::trace!("  pushing new message {message:?}");
781                        unsent_messages.push_back(message);
782                        num_messages += 1;
783                        last_msg_added_epos = line_start_pos;
784                    }
785                    // now we process the new message
786                    log::trace!("  resetting markers for a new message with code {new_code}");
787                    code = new_code;
788                    message_size = nl_pos - pos;
789                    msg_start_pos = pos;
790                }
791            }
792        }
793        pos = next_line_pos; // move to the next line
794    }
795
796    // push all unprocessed bytes to self.unparsed_bytes
797    let remaining = buffer.len() - last_msg_added_epos;
798    log::trace!("conn: found {remaining} pending bytes");
799    if remaining > 0 {
800        unparsed_bytes.clear();
801        let src = &buffer[(buffer.len() - remaining)..];
802        log::trace!("conn: enqueuing pending: {}", &String::from_utf8_lossy(src));
803        unparsed_bytes.extend_from_slice(src);
804    }
805
806    log::trace!("  already enqueued {num_messages} messages");
807    Ok(num_messages)
808}
809
810/// Parse the 4 digit code at the front of a bird response
811#[inline]
812fn parse_code(buffer: &[u8]) -> Result<u32> {
813    let text = std::str::from_utf8(&buffer[0..4])?;
814    match text.parse() {
815        Ok(num) => Ok(num),
816        Err(err) => {
817            log::error!(
818                "failed to parse string to u32. this is not a u32: '{}'\n{}",
819                text,
820                err
821            );
822            Err(err.into())
823        }
824    }
825}
826
827/// Parse a [Message] and return it
828#[inline]
829fn parse_message(code: u32, buffer: &[u8], start_pos: usize, msg_size: usize) -> Result<Message> {
830    let mut v: Vec<u8> = Vec::with_capacity(msg_size);
831    let mut idx = 0;
832    let mut pos = start_pos;
833    while pos < buffer.len() {
834        if idx > 0 {
835            pos += match buffer[pos] {
836                b' ' => 1,
837                _ => 5,
838            };
839            v.push(b'\n'); // this is for the newline needed after previous line
840        }
841        idx += 1;
842
843        if let Some(nl_pos) = buffer[pos..].iter().position(|it| *it == b'\n') {
844            let src = &buffer[pos..(pos + nl_pos)];
845            v.extend_from_slice(src);
846            pos += src.len() + 1;
847            if v.len() == msg_size {
848                break;
849            }
850        } else {
851            // we don't see a new line, so this must be the last line
852            // so we break at the end of this
853            let src = &buffer[pos..];
854            v.extend_from_slice(src);
855            break;
856        }
857    }
858    Ok(Message::from_code(code, std::str::from_utf8(&v)?))
859}
860
861/// Reads are done in sizes of this
862pub const READ_FRAME_SIZE: usize = 2048;
863
864/// Unit tests
865#[cfg(test)]
866mod tests {
867    use super::*;
868    use crate::*;
869
870    /// Handy fn for removing the indents from a multi-line string
871    fn heredoc(s: &str) -> String {
872        let indent = if let Some(line2) = s.split('\n').nth(2) {
873            line2.find(char::is_alphanumeric).unwrap_or(0)
874        } else {
875            0
876        };
877        s.lines()
878            .map(|x| (if x.starts_with(' ') { &x[indent..] } else { x }).into())
879            .collect::<Vec<String>>()
880            .join("\n")
881    }
882
883    /// The bird response that we test against
884    fn get_test_text() -> String {
885        let _ = env_logger::try_init();
886
887        heredoc(
888            "0001 BIRD 2.0.7 ready.
889            show interfaces
890            1001-lo up (index=1)
891            1004-\tMultiAccess AdminUp LinkUp Loopback Ignored MTU=65536
892            1003-\t127.0.0.1/8 (Preferred, scope host)
893             \t::1/128 (Preferred, scope host)
894            1001-eth0 up (index=2)
895            1004-\tMultiAccess Broadcast Multicast AdminUp LinkUp MTU=9000
896            1003-\t172.30.0.12/16 (Preferred, scope site)
897             \t172.29.1.15/32 (scope univ)
898             \t172.29.1.16/32 (scope univ)
899             \t172.29.1.17/32 (scope univ)
900             \tfe80::4495:80ff:fe71:a791/64 (Preferred, scope link)
901             \tfe80::4490::72/64 (scope univ)
902            1001-eth1 up (index=3)
903            1004-\tMultiAccess Broadcast Multicast AdminUp LinkUp MTU=1500
904            1003-\t169.254.199.2/30 (Preferred, opposite 169.254.199.1, scope univ)
905             \tfe80::a06f:7ff:fea7:c662/64 (Preferred, scope link)
906             \tfe80:169:254:199::2/126 (scope link)
907            0000 
908            ",
909        )
910    }
911
912    /// Returns the nth position match as determined by `op`
913    fn get_nth_pos<F>(text: &str, n: u32, op: F) -> usize
914    where
915        F: Fn(&str) -> Option<usize>,
916    {
917        let mut pos: usize = 0;
918        let mut num_match: u32 = 0;
919        for line in text.lines() {
920            if let Some(x) = op(line) {
921                num_match += 1;
922                if num_match == n {
923                    pos += x;
924                    break;
925                }
926            }
927            pos += line.len() + 1;
928        }
929        pos
930    }
931
932    /// Tests parsing of a single line response
933    #[test]
934    fn test_single_line_parsing() {
935        let text = get_test_text();
936        let needle = "lo up (index=1)";
937        let buffer = text.as_bytes();
938        let start_pos = text.find(needle).unwrap();
939        let message = parse_message(1001, buffer, start_pos, needle.len())
940            .expect("should not have failed parsing");
941        if let Message::InterfaceList(s) = message {
942            assert_eq!(s, needle);
943        } else {
944            panic!("incorrect message type {message:?}");
945        }
946    }
947
948    /// Tests parsing of a multi-line response
949    #[test]
950    fn test_multi_line_parsing() {
951        let text = get_test_text();
952
953        let start_pos = get_nth_pos(&text, 1, |x| {
954            if x.ends_with("MTU=9000") {
955                Some(x.len())
956            } else {
957                None
958            }
959        }) + 6;
960        let end_pos = get_nth_pos(&text, 3, |x| {
961            if x.starts_with("1001-") {
962                Some(0)
963            } else {
964                None
965            }
966        }) - 1;
967        let buffer = text.as_bytes();
968        let msg_size = end_pos
969            - start_pos
970            - unsafe {
971                std::str::from_utf8_unchecked(&buffer[start_pos..end_pos])
972                    .matches('\n')
973                    .count()
974            };
975        let message = parse_message(1003, buffer, start_pos, msg_size)
976            .expect("should not have failed parsing");
977        if let Message::InterfaceAddress(s) = message {
978            assert!(s.starts_with("\t172.30.0.12"));
979            assert!(s.contains("\n\t172.29.1.15/32 (scope univ)\n"));
980            assert!(s.contains("\n\t172.29.1.16/32 (scope univ)\n"));
981            assert!(s.contains("\n\t172.29.1.17/32 (scope univ)\n"));
982            assert!(s.ends_with("fe80::4490::72/64 (scope univ)"));
983            assert!(!s.ends_with('\n'));
984        } else {
985            panic!("incorrect message type {message:?}");
986        }
987    }
988
989    /// Tests parsing of a multi-line response that's at the very end
990    #[test]
991    fn test_multi_line_parsing_at_end() {
992        let text = get_test_text().replace("\n0000 \n", "");
993        let start_pos = get_nth_pos(&text, 3, |x| {
994            if x.starts_with("1003-") {
995                Some(0)
996            } else {
997                None
998            }
999        }) + 5;
1000        let end_pos = text.len();
1001        let buffer = text.as_bytes();
1002        let msg_size = end_pos
1003            - start_pos
1004            - unsafe {
1005                std::str::from_utf8_unchecked(&buffer[start_pos..end_pos])
1006                    .matches('\n')
1007                    .count()
1008            };
1009        let message = parse_message(1003, buffer, start_pos, msg_size)
1010            .expect("should not have failed parsing");
1011        if let Message::InterfaceAddress(s) = message {
1012            assert!(s.starts_with("\t169.254.199.2"));
1013            assert!(s.contains("\n\tfe80::a06f:7ff:fea7:c662/64 (Preferred, scope link)\n"));
1014            assert!(s.contains("\n\tfe80:169:254:199::2/126 (scope link)"));
1015            assert!(!s.ends_with('\n'));
1016        } else {
1017            panic!("incorrect message type {message:?}");
1018        }
1019    }
1020}