syslog_parse 1.0.2

parse syslog,check syslog format
Documentation
use crate::protocol::single_parse;
use chrono::{DateTime, Duration, Local};
use nom::bytes::complete::take_until;
use nom::error::{Error, ParseError};

const DEFAULT_MAX_SIZE: usize = 1000;

#[derive(Debug)]
pub struct MsgStream {
    pub last_msg: String,
    pub res: Vec<u8>,
    max_size: usize,
    last_parse_time: DateTime<Local>,
}

impl Default for MsgStream {
    fn default() -> Self {
        MsgStream {
            last_msg: "".to_string(),
            res: vec![],
            max_size: DEFAULT_MAX_SIZE,
            last_parse_time: Local::now(),
        }
    }
}

impl MsgStream {
    pub fn set_max_size(&mut self, size: usize) {
        self.max_size = size;
    }

    pub fn clear(&mut self) {
        self.last_msg.clear();
        self.res.clear();
    }

    pub fn multi_parse<'a, T: ParseError<&'a str>>(
        &mut self,
        value: Vec<u8>,
    ) -> Result<Vec<String>, T> {
        self.last_parse_time = Local::now();
        let next_msg = if self.last_msg.ends_with('<') {
            self.last_msg.pop();
            ["<".as_bytes().to_vec(), self.res.clone(), value].concat()
        } else {
            [self.res.clone(), value].concat()
        };
        let next_msg = String::from_utf8_lossy(&next_msg);
        let mut next_msg = next_msg.as_ref();
        let mut logs = Vec::new();
        loop {
            match take_until::<&str, &str, Error<&str>>("<")(next_msg) {
                Ok((res, msg)) => {
                    self.last_msg += msg;
                    next_msg = res;
                    if let Ok((_, msg)) = single_parse(next_msg) {
                        next_msg = msg.msg;
                        if !self.last_msg.is_empty() {
                            logs.push(self.last_msg.clone());
                            self.last_msg.clear();
                        }
                        self.last_msg += &msg.header;
                        if logs.len().ge(&self.max_size) {
                            self.res = next_msg.as_bytes().to_vec();
                            return Ok(logs);
                        }
                    } else {
                        let first = next_msg.chars().next().unwrap_or_default();
                        self.last_msg.push(first);
                        next_msg = &next_msg[1..next_msg.len()];
                    }
                }
                Err(_) => {
                    self.res = next_msg.as_bytes().to_vec();
                    return Ok(logs);
                }
            }
        }
    }
}

impl Iterator for MsgStream {
    type Item = String;

    fn next(&mut self) -> Option<Self::Item> {
        let now = Local::now();
        let next_msg = if self.last_msg.ends_with('<') {
            self.last_msg.pop();
            ["<".as_bytes().to_vec(), self.res.clone()].concat()
        } else {
            self.res.clone()
        };
        let next_msg = String::from_utf8_lossy(&next_msg);
        let mut next_msg = next_msg.as_ref();
        loop {
            match take_until::<&str, &str, Error<&str>>("<")(next_msg) {
                Ok((res, msg)) => {
                    self.last_msg += msg;
                    next_msg = res;
                    if let Ok((_, msg)) = single_parse(next_msg) {
                        next_msg = msg.msg;
                        if !self.last_msg.is_empty() {
                            let last_msg = self.last_msg.clone();
                            self.last_msg = msg.header;
                            return Some(last_msg);
                        }
                    } else {
                        let first = next_msg.chars().next().unwrap_or_default();
                        self.last_msg.push(first);
                        next_msg = &next_msg[1..next_msg.len()];
                    }
                }
                Err(_) => {
                    self.res = next_msg.as_bytes().to_vec();
                    break;
                }
            }
        }

        let interval = now.signed_duration_since(self.last_parse_time);
        if interval.ge(&Duration::milliseconds(500)) && !self.last_msg.is_empty() {
            let msg = format!("{}{}", self.last_msg, String::from_utf8_lossy(&self.res));
            self.clear();
            return Some(msg);
        }
        None
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_1() {
        let mut parsed = MsgStream::default();
        let value = "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - "
            .as_bytes()
            .to_vec();
        let msg = parsed.multi_parse::<Error<&str>>(value).unwrap();
        assert_eq!(msg.len(), 0);

        let value = "hello world<11>1 2023-09-07T09:45:08.899092Z localhost <90>myprogram5424 42 1545121 - tcp 传输 syslog<11>1 2023-09-07T09:45:08.899092Z localhost <90>myprogram5424 42 1545121 - tcp 传输 syslog2323".as_bytes().to_vec();
        let msg = parsed.multi_parse::<Error<&str>>(value).unwrap();
        assert_eq!(msg.len(), 2);

        parsed.last_parse_time = Local::now()
            .checked_sub_signed(Duration::seconds(10))
            .unwrap();
        let msg = parsed.next().unwrap();
        assert_eq!(msg.len(), 96);
    }

    #[test]
    fn test_2() {
        let mut parsed = MsgStream::default();
        parsed.last_msg =
            "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - "
                .to_string();
        parsed.res = "hello world<46>1 2023-09-13T11:15".as_bytes().to_vec();

        let value = ":47.697131Z localhost dygen 48747 185 - hello world<46>1 2023-09-13T11:15:47.707412Z localhost dygen 48747 186 - hello world<46>1 2023-09-13T11:15:47.719776Z localhost dygen 48747 187 - hello world<46>1 2023-09-13T11:15:47.731068Z localhost dygen 48747 188 - hello world<46>1 2023-09-13T11:15:47.741438Z localhost dygen 48747 189 - hello world<46>1 2023-09-13T11:15:47.752566Z localhost dygen 48747 190 - hello world<46>1 2023-09-13T11:15:47.765048Z localhost dygen 48747 191 - hello world<46>1 2023-09-13T11:15:47.775234Z localhost dygen 48747 192 - hello world<46>1 2023-09-13T11:15:47.786644Z localhost dygen 48747 193 - hello world<46>1 2023-09-13T11:15:47.796719Z localhost dygen 48747 194 - hello world<46>1 2023-09-13T11:15:47.808883Z localhost dygen 48747 195 - hello world<46>1 2023-09-13T11:15:47.819102Z localhost dygen 48747 196 - hello world<46>1 2023-09-13T11:15:47.830662Z localhost dygen 48747 197 - hello world<46>1 2023-09-13T11:15:47.842487Z localhost dygen 48747 198 - hello world<46>1 2023-09-13T11:15:4".as_bytes().to_vec();
        let msg = parsed.multi_parse::<Error<&str>>(value).unwrap();
        assert_eq!(msg.len(), 14);
        assert_eq!(msg[0].len(), 86);
    }

    #[test]
    fn test_3() {
        let mut parsed = MsgStream::default();
        parsed.last_msg =
            "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - "
                .to_string();
        parsed.res = "hello world<46>1 202".as_bytes().to_vec();
        parsed.set_max_size(10);

        let value = "3-09-13T11:36:20.127738Z localhost dygen 50113 199 - hello world<46>1 2023-09-13T11:36:20.139313Z localhost dygen 50113 200 - hello world<46>1 2023-09-13T11:36:20.151548Z localhost dygen 50113 201 - hello world<46>1 2023-09-13T11:36:20.163635Z localhost dygen 50113 202 - hello world<46>1 2023-09-13T11:36:20.174879Z localhost dygen 50113 203 - hello world<46>1 2023-09-13T11:36:20.18724Z localhost dygen 50113 204 - hello world<46>1 2023-09-13T11:36:20.198862Z localhost dygen 50113 205 - hello world<46>1 2023-09-13T11:36:20.20901Z localhost dygen 50113 206 - hello world<46>1 2023-09-13T11:36:20.221036Z localhost dygen 50113 207 - hello world<46>1 2023-09-13T11:36:20.232828Z localhost dygen 50113 208 - hello world<46>1 2023-09-13T11:36:20.245382Z localhost dygen 50113 209 - hello world<46>1 2023-09-13T11:36:20.255931Z localhost dygen 50113 210 - hello world<46>1 2023-09-13T11:36:20.266635Z localhost dygen 50113 211 - hello world<46>1 2023-09-13T11:36:20.278109Z localhost dygen 50113 212 - hello world<46>1 2023-09";
        let msg = parsed
            .multi_parse::<Error<&str>>(value.as_bytes().to_vec())
            .unwrap();
        assert_eq!(msg.len(), 10);
        assert_eq!(msg[0].len(), 86);
    }

    #[test]
    fn test_4() {
        let mut parsed = MsgStream::default();
        parsed.last_msg =  "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - hello world<"
            .to_string();
        parsed.res = "46>1 2023-09-13T11:49:59.72".as_bytes().to_vec();

        let value = "0778Z localhost dygen 51187 437 - hello world<46>1 2023-09-13T11:49:59.732237Z localhost dygen 51187 438 - hello world<46>1 2023-09-13T11:49:59.743385Z localhost dygen 51187 439 - hello world<46>1 2023-09-13T11:49:59.755518Z localhost dygen 51187 440 - hello world<46>1 2023-09-13T11:49:59.766871Z localhost dygen 51187 441 - hello world<46>1 2023-09-13T11:49:59.777432Z localhost dygen 51187 442 - hello world<46>1 2023-09-13T11:49:59.78831Z localhost dygen 51187 443 - hello world<46>1 2023-09-13T11:49:59.8Z localhost dygen 51187 444 - hello world<46>1 2023-09-13T11:49:59.81139Z localhost dygen 51187 445 - hello world<46>1 2023-09-13T11:49:59.821541Z localhost dygen 51187 446 - hello world<46>1 2023-09-13T11:49:59.833128Z localhost dygen 51187 447 - hello world<46>1 2023-09-13T11:49:59.845375Z localhost dygen 51187 448 - hello world<46>1 2023-09-13T11:49:59.857092Z localhost dygen 51187 449 - hello world<46>1 2023-09-13T11:49:59.869078Z localhost dygen 51187 450 - hello world<46>1 2023-09-13T11:49:59.881514Z";
        let msg = parsed
            .multi_parse::<Error<&str>>(value.as_bytes().to_vec())
            .unwrap();
        assert_eq!(msg.len(), 14);
        assert_eq!(msg[0].len(), 86);
    }

    #[test]
    fn test_5() {
        let mut parsed = MsgStream::default();
        parsed.last_msg =  "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - hello world".to_string();
        parsed.res = "<46>1 2023-09-13T12:14:23.376067Z localhost dygen 52828 "
            .as_bytes()
            .to_vec();

        let value = "746 - hello world<46>1 2023-09-13T12:14:23.388603Z localhost dygen 52828 747 - hello world<46>1 2023-09-13T12:14:23.398801Z localhost dygen 52828 748 - hello world<46>1 2023-09-13T12:14:23.40943Z localhost dygen 52828 749 - hello world<46>1 2023-09-13T12:14:23.421159Z localhost dygen 52828 750 - hello world<46>1 2023-09-13T12:14:23.43254Z localhost dygen 52828 751 - hello world<46>1 2023-09-13T12:14:23.444639Z localhost dygen 52828 752 - hello world<46>1 2023-09-13T12:14:23.455732Z localhost dygen 52828 753 - hello world<46>1 2023-09-13T12:14:23.467854Z localhost dygen 52828 754 - hello world<46>1 2023-09-13T12:14:23.478999Z localhost dygen 52828 755 - hello world<46>1 2023-09-13T12:14:23.489494Z localhost dygen 52828 756 - hello world<46>1 2023-09-13T12:14:23.501359Z localhost dygen 52828 757 - hello world<46>1 2023-09-13T12:14:23.512624Z localhost dygen 52828 758 - hello world<46>1 2023-09-13T12:14:23.524783Z localhost dygen 52828 759 - hello world<46>1 2023-09-13T12:14:23.536167Z localhost dygen 52828 760 ";
        let msg = parsed
            .multi_parse::<Error<&str>>(value.as_bytes().to_vec())
            .unwrap();
        assert_eq!(msg.len(), 14);
        assert_eq!(msg[0].len(), 86);
    }

    #[test]
    fn test_6() {
        let mut parsed = MsgStream::default();
        parsed.last_msg =  "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - hello world".to_string();
        parsed.res = "<46>1                                    "
            .as_bytes()
            .to_vec();

        let value = "2023-09-13T15:39:43.346956Z localhost dygen 60458 901 - hello world<46>1 2023-09-13T15:39:43.357974Z localhost dygen 60458 902 - hello world<46>1 2023-09-13T15:39:43.368058Z localhost dygen 60458 903 - hello world<46>1 2023-09-13T15:39:43.378762Z localhost dygen 60458 904 - hello world<46>1 2023-09-13T15:39:43.388876Z localhost dygen 60458 905 - hello world<46>1 2023-09-13T15:39:43.401149Z localhost dygen 60458 906 - hello world<46>1 2023-09-13T15:39:43.412786Z localhost dygen 60458 907 - hello world<46>1 2023-09-13T15:39:43.423378Z localhost dygen 60458 908 - hello world<46>1 2023-09-13T15:39:43.433474Z localhost dygen 60458 909 - hello world<46>1 2023-09-13T15:39:43.444026Z localhost dygen 60458 910 - hello world<46>1 2023-09-13T15:39:43.456378Z localhost dygen 60458 911 - hello world<46>1 2023-09-13T15:39:43.466604Z localhost dygen 60458 912 - hello world<46>1 2023-09-13T15:39:43.477399Z localhost dygen 60458 913 - hello world<46>1 2023-09-13T15:39:43.488605Z localhost dygen 60458 914 - hello world<46>1 20";
        let msg = parsed
            .multi_parse::<Error<&str>>(value.as_bytes().to_vec())
            .unwrap();
        assert_eq!(msg.len(), 14);
        assert_eq!(msg[0].len(), 86);
    }

    #[test]
    fn test_7() {
        let mut parsed = MsgStream::default();
        parsed.last_msg =  "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - hello world".to_string();
        parsed.res = "<46>1 2023-09-14T03:06:27.889916Z localho"
            .as_bytes()
            .to_vec();

        let value = "st dygen 71326 29 - hello world<46>1 2023-09-14T03:06:27.900082Z localhost dygen 71326 30 - hello world<46>1 2023-09-14T03:06:27.912639Z localhost dygen 71326 31 - hello world<46>1 2023-09-14T03:06:27.923987Z localhost dygen 71326 32 - hello world<46>1 2023-09-14T03:06:27.934671Z localhost dygen 71326 33 - hello world<46>1 2023-09-14T03:06:27.947075Z localhost dygen 71326 34 - hello world<46>1 2023-09-14T03:06:27.95862Z localhost dygen 71326 35 - hello world<46>1 2023-09-14T03:06:27.970422Z localhost dygen 71326 36 - hello world<46>1 2023-09-14T03:06:27.982756Z localhost dygen 71326 37 - hello world<46>1 2023-09-14T03:06:27.99446Z localhost dygen 71326 38 - hello world<46>1 2023-09-14T03:06:28.005502Z localhost dygen 71326 39 - hello world<46>1 2023-09-14T03:06:28.016147Z localhost dygen 71326 40 - hello world<46>1 2023-09-14T03:06:28.027632Z localhost dygen 71326 41 - hello world<46>1 2023-09-14T03:06:28.03863Z localhost dygen 71326 42 - hello world<46>1 2023-09-14T03:06:28.049456Z localhost dygen 71326 43 -";
        let msg = parsed
            .multi_parse::<Error<&str>>(value.as_bytes().to_vec())
            .unwrap();
        assert_eq!(msg.len(), 15);
        assert_eq!(msg[0].len(), 86);
    }

    #[test]
    fn test_8() {
        let mut parsed = MsgStream::default();
        parsed.last_msg = "<11>1 2023-09-07T09:45:08.899012Z localhost <90>myprogram5424 42 9533322 - hello world".to_string();
        parsed.res = "<46>1 202".as_bytes().to_vec();

        let value = "3-09-14T03:06:30.292049Z localhost dygen 71326 241 - hello world<46>1 2023-09-14T03:06:30.302195Z localhost dygen 71326 242 - hello world<46>1 2023-09-14T03:06:30.314143Z localhost dygen 71326 243 - hello world<46>1 2023-09-14T03:06:30.325347Z localhost dygen 71326 244 - hello world<46>1 2023-09-14T03:06:30.336867Z localhost dygen 71326 245 - hello world<46>1 2023-09-14T03:06:30.3483Z localhost dygen 71326 246 - hello world<46>1 2023-09-14T03:06:30.3599Z localhost dygen 71326 247 - hello world<46>1 2023-09-14T03:06:30.371696Z localhost dygen 71326 248 - hello world<46>1 2023-09-14T03:06:30.383623Z localhost dygen 71326 249 - hello world<46>1 2023-09-14T03:06:30.394759Z localhost dygen 71326 250 - hello world<46>1 2023-09-14T03:06:30.40684Z localhost dygen 71326 251 - hello world<46>1 2023-09-14T03:06:30.418307Z localhost dygen 71326 252 - hello world<46>1 2023-09-14T03:06:30.430398Z localhost dygen 71326 253 - hello world<46>1 2023-09-14T03:06:30.441497Z localhost dygen 71326 254 - hello world<46>1 2023-09-14";
        let msg = parsed
            .multi_parse::<Error<&str>>(value.as_bytes().to_vec())
            .unwrap();
        assert_eq!(msg.len(), 14);
        assert_eq!(msg[0].len(), 86);
    }
}