1use core::str::Utf8Error;
4
5use bytes::{Buf, Bytes, BytesMut};
6use bytes_utils::Str;
7
8use crate::constants::{CR, LF};
9
10#[derive(Debug, Clone, Copy)]
12pub enum RawEventLine<'a> {
13 Comment, Field {
15 field_name: &'a [u8],
16 field_value: Option<&'a [u8]>,
17 },
18 Empty,
19}
20
21#[derive(Debug, Clone)]
23pub enum RawEventLineOwned {
24 Comment,
25 Empty,
26 Field {
27 field_name: Bytes,
28 field_value: Option<Bytes>,
29 },
30}
31
32#[derive(Debug, Clone, Copy)]
34pub enum FieldName {
35 Event,
36 Data,
37 Id,
38 Retry,
39 Ignored,
40}
41
42#[derive(Debug, Clone)]
44pub enum ValidatedEventLine {
45 Comment,
46 Empty,
47 Field {
48 field_name: FieldName,
49 field_value: Option<Str>,
50 },
51}
52
53fn validate_bytes(val: Bytes) -> Result<Str, Utf8Error> {
54 match str::from_utf8(val.as_ref()) {
55 Ok(_) => Ok(unsafe { Str::from_inner_unchecked(val) }),
56 Err(e) => Err(e),
57 }
58}
59
60impl RawEventLineOwned {
61 pub fn validate(self) -> Result<ValidatedEventLine, core::str::Utf8Error> {
62 match self {
63 RawEventLineOwned::Comment => Ok(ValidatedEventLine::Comment),
64 RawEventLineOwned::Empty => Ok(ValidatedEventLine::Empty),
65 RawEventLineOwned::Field {
66 field_name,
67 field_value,
68 } => {
69 let field_name = match field_name.as_ref() {
70 b"event" => FieldName::Event,
71 b"data" => FieldName::Data,
72 b"id" => FieldName::Id,
73 b"retry" => FieldName::Retry,
74 _ => FieldName::Ignored,
75 };
76
77 let field_value = match field_value {
78 Some(b) => Some(validate_bytes(b)?),
79 None => None,
80 };
81
82 Ok(ValidatedEventLine::Field {
83 field_name,
84 field_value,
85 })
86 }
87 }
88 }
89}
90
91fn find_eol(bytes: &[u8]) -> Option<(usize, usize)> {
94 let first_match = memchr::memchr2(CR, LF, bytes)?;
95
96 match bytes[first_match] {
97 LF => Some((first_match, first_match + 1)),
98 CR => {
99 if first_match + 1 >= bytes.len() {
100 return None; }
102
103 if bytes[first_match + 1] == LF {
105 Some((first_match, first_match + 2))
106 } else {
107 Some((first_match, first_match + 1))
109 }
110 }
111 _ => unreachable!(),
112 }
113}
114
115fn split_at_next_eol(bytes: &[u8]) -> Option<(&[u8], &[u8])> {
117 find_eol(bytes).map(|(line_end, rem_start)| (&bytes[..line_end], &bytes[rem_start..]))
118}
119
120fn read_line(bytes: &[u8]) -> RawEventLine<'_> {
121 match memchr::memchr(b':', bytes) {
122 Some(colon_pos) => {
123 if colon_pos == 0 {
124 RawEventLine::Comment
125 } else {
126 let value = &bytes[colon_pos + 1..];
127 let value = match value {
131 [b' ', rest @ ..] => rest,
132 _ => value,
133 };
134 RawEventLine::Field {
135 field_name: &bytes[..colon_pos],
136 field_value: Some(value),
137 }
138 }
139 }
140 None => {
141 if bytes.is_empty() {
142 RawEventLine::Empty
143 } else {
144 RawEventLine::Field {
145 field_name: bytes,
146 field_value: None,
147 }
148 }
149 }
150 }
151}
152
153pub fn parse_line(bytes: &[u8]) -> Option<(RawEventLine<'_>, &[u8])> {
155 let (line_to_read, next) = split_at_next_eol(bytes)?;
156 Some((read_line(line_to_read), next))
157}
158
159pub fn parse_line_from_buffer(buffer: &mut BytesMut) -> Option<RawEventLineOwned> {
162 let (line_end, rem_start) = find_eol(buffer)?;
163
164 let line = buffer.split_to(line_end).freeze();
165 buffer.advance(rem_start - line_end);
166
167 if line.is_empty() {
168 return Some(RawEventLineOwned::Empty);
169 }
170
171 match memchr::memchr(b':', &line) {
172 Some(0) => Some(RawEventLineOwned::Comment),
173 Some(colon_pos) => {
174 let value_start = if line.get(colon_pos + 1) == Some(&b' ') {
175 colon_pos + 2
176 } else {
177 colon_pos + 1
178 };
179 Some(RawEventLineOwned::Field {
180 field_name: line.slice(..colon_pos),
181 field_value: Some(line.slice(value_start..)),
182 })
183 }
184 None => Some(RawEventLineOwned::Field {
185 field_name: line,
186 field_value: None,
187 }),
188 }
189}
190
191pub fn parse_line_from_bytes(buffer: &mut Bytes) -> Option<RawEventLineOwned> {
192 let (line_end, rem_start) = find_eol(buffer)?;
193
194 let line = buffer.split_to(line_end);
195 buffer.advance(rem_start - line_end);
196
197 if line.is_empty() {
198 return Some(RawEventLineOwned::Empty);
199 }
200
201 match memchr::memchr(b':', &line) {
202 Some(0) => Some(RawEventLineOwned::Comment),
203 Some(colon_pos) => {
204 let value_start = if line.get(colon_pos + 1) == Some(&b' ') {
205 colon_pos + 2
206 } else {
207 colon_pos + 1
208 };
209 Some(RawEventLineOwned::Field {
210 field_name: line.slice(..colon_pos),
211 field_value: Some(line.slice(value_start..)),
212 })
213 }
214 None => Some(RawEventLineOwned::Field {
215 field_name: line,
216 field_value: None,
217 }),
218 }
219}