1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use crate::io::AsyncCacheRead;
use pin_project_lite::pin_project;
use regex::bytes::*;
use std::future::Future;
use std::io;
use std::io::ErrorKind;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::io::ReadBuf;

pin_project! {
    /// The delimiter is included in the resulting vector.
    #[derive(Debug)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    pub struct ReadUntilRegex<'a, R: ?Sized> {
        reader: &'a mut R,
        regex: Regex,
        buf: &'a mut Vec<u8>,
        internal_buf: Vec<u8>,
        // The number of bytes appended to buf. This can be less than buf.len() if
        // the buffer was not empty when the operation was started.
        read: usize,
        // Make this future `!Unpin` for compatibility with async trait methods.
        #[pin]
        _pin: PhantomPinned,
    }
}

pub(crate) fn read_until_regex<'a, R>(
    reader: &'a mut R,
    pattern: &str,
    buf: &'a mut Vec<u8>,
) -> Result<ReadUntilRegex<'a, R>, regex::Error>
where
    R: AsyncCacheRead + ?Sized + Unpin,
{
    let regex = Regex::new(pattern)?;
    Ok(ReadUntilRegex {
        reader,
        regex,
        buf,
        internal_buf: Vec::new(),
        read: 0,
        _pin: PhantomPinned,
    })
}

fn eof() -> io::Error {
    io::Error::new(ErrorKind::UnexpectedEof, "early eof")
}

pub(super) fn read_until_regex_internal<R: AsyncCacheRead + ?Sized>(
    mut reader: Pin<&mut R>,
    cx: &mut Context<'_>,
    regex: &mut Regex,
    buf: &mut Vec<u8>,
    internal_buf: &mut Vec<u8>,
    read: &mut usize,
) -> Poll<io::Result<(usize, usize)>> {
    let mut read_buf = [0u8; 4096];
    let mut data = ReadBuf::new(&mut read_buf);
    loop {
        data.clear();
        ready!(reader.as_mut().poll_reader(cx, &mut data))?;
        let read_len = data.filled().len();
        if read_len == 0 {
            return Err(eof()).into();
        }
        *read += read_len;
        internal_buf.extend_from_slice(data.filled());

        match regex.find(&internal_buf) {
            Some(m) => {
                let drain_index = m.end();
                buf.extend_from_slice(&internal_buf[..drain_index]);
                let restore_data = &internal_buf[drain_index..];
                reader.restore(restore_data);
                *read -= restore_data.len();
                return Poll::Ready(Ok((buf.len(), m.len())));
            }
            None => {}
        }
    }
}

impl<R: AsyncCacheRead + ?Sized + Unpin> Future for ReadUntilRegex<'_, R> {
    type Output = io::Result<(usize, usize)>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let me = self.project();
        read_until_regex_internal(
            Pin::new(*me.reader),
            cx,
            me.regex,
            me.buf,
            me.internal_buf,
            me.read,
        )
    }
}