1#![doc = include_str!("../README.md")]
2#![warn(
3 missing_docs,
4 missing_debug_implementations,
5 missing_copy_implementations,
6 trivial_casts,
7 trivial_numeric_casts,
8 unused_extern_crates,
9 unused_import_braces,
10 unused_qualifications,
11 variant_size_differences
12)]
13
14use std::{
15 io::{BufRead, Cursor, Read},
16 ops::Range,
17};
18
19use quick_xml::{Reader, Writer, events::Event};
20use regex::bytes::Regex;
21use thiserror::Error;
22
23pub fn filter_xmls(input: impl BufRead, xpath: Option<&str>) {
25 let xml_extractor = XmlExtractor::<_, 1024>::new(input, None).into_iter();
26 if let Some(xpath) = xpath {
27 for entry in xml_extractor.map(|xml| {
28 xml.map(|xml| {
29 use amxml::dom::*;
30 let doc = new_document(&xml).expect("well formed XML");
31 let root = doc.root_element();
32 let result = root.eval_xpath(xpath).expect("XPath expression");
33
34 (0..result.len())
35 .filter_map(|i| result.get_item(i).as_nodeptr())
36 .map(|item| item.to_string())
37 .collect::<Vec<_>>()
38 .join("\n")
39 })
40 }) {
41 println!("{}", entry.expect("next xml"));
42 }
43 } else {
44 for entry in xml_extractor {
45 println!("{}", entry.expect("next xml"));
46 }
47 }
48}
49
50#[cfg_attr(feature = "debug", derive(Debug))]
52pub struct XmlExtractor<I: BufRead, const B: usize = 1024> {
53 input: Option<I>,
54 buffer: [u8; B],
55 log_entry_regex: Option<Regex>,
56}
57
58impl<I: BufRead, const B: usize> XmlExtractor<I, B> {
59 pub fn new(input: I, log_entry_regex: Option<Regex>) -> XmlExtractor<I, B> {
61 XmlExtractor {
62 input: Some(input),
63 buffer: [0; B],
64 log_entry_regex,
65 }
66 }
67}
68
69impl<I: BufRead, const B: usize> IntoIterator for XmlExtractor<I, B> {
70 type Item = Result<String, XmlExtractorError>;
71 type IntoIter = XmlExtractorIter<I, B>;
72
73 fn into_iter(self) -> Self::IntoIter {
74 const LOG_ENTRY_DATE_MIN_LEN: usize = 19;
75 let Self {
76 mut input,
77 mut buffer,
78 log_entry_regex,
79 } = self;
80 let Some(mut input) = input.take() else {
81 panic!("Input stream is empty");
82 };
83 let mut total_count = 0;
84 let mut eof = false;
85 while let Ok(count) = input.read(&mut buffer[total_count..]) {
86 if count == 0 {
87 eof = true;
88 break;
89 }
90 total_count += count;
91 if total_count > LOG_ENTRY_DATE_MIN_LEN {
92 break;
93 }
94 }
95 let log_entry_regex = log_entry_regex.or_else(|| {
96 const LOG_ENTRY_DATE_REGEX: &str = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}";
97 const LOG_NEW_ENTRY_DATE_REGEX: &str = r"\n\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}";
98 const LOG_ENTRY_NON_WS_REGEX: &str = r"^\S";
99 const LOG_NEW_ENTRY_NON_WS_REGEX: &str = r"\n\S";
100
101 if total_count > LOG_ENTRY_DATE_MIN_LEN
102 && Regex::new(LOG_ENTRY_DATE_REGEX)
103 .expect("valid regex")
104 .is_match(&buffer[..total_count])
105 {
106 Some(Regex::new(LOG_NEW_ENTRY_DATE_REGEX).expect("valid regex"))
107 } else if Regex::new(LOG_ENTRY_NON_WS_REGEX)
108 .expect("valid regex")
109 .is_match(&buffer[..total_count])
110 {
111 Some(Regex::new(LOG_NEW_ENTRY_NON_WS_REGEX).expect("valid regex"))
112 } else {
113 None
114 }
115 });
116
117 Self::IntoIter {
118 input: if eof { None } else { Some(input) },
119 buffer,
120 total_pos: 0,
121 head_range: 0..total_count,
122 log_entry_regex,
123 }
124 }
125}
126
127#[derive(Debug, Error)]
129pub enum XmlExtractorError {
130 #[error("I/O error")]
132 Io(#[from] std::io::Error),
133 #[error("Regex error")]
135 Regex(#[from] regex::Error),
136}
137
138#[cfg_attr(feature = "debug", derive(Debug))]
140pub struct XmlExtractorIter<I: BufRead, const B: usize> {
141 input: Option<I>,
142 buffer: [u8; B],
143 total_pos: usize,
144 log_entry_regex: Option<Regex>,
145 head_range: Range<usize>,
146}
147
148impl<I: BufRead, const B: usize> Iterator for XmlExtractorIter<I, B> {
149 type Item = Result<String, XmlExtractorError>;
150
151 fn next(&mut self) -> Option<Self::Item> {
152 let &mut Self {
153 ref mut log_entry_regex,
154 ref mut head_range,
155 ref mut total_pos,
156 ref mut input,
157 ref mut buffer,
158 } = self;
159
160 let mut stream = input.take()?;
161
162 let mut head = &buffer[head_range.clone()];
163
164 let mut result = None;
165
166 loop {
167 while let Some(pos) = head.iter().position(|&n| n == b'<') {
168 head = &head[pos..];
169 *total_pos += pos;
170 let cursor = Cursor::new(head);
171
172 let xml_candidate = cursor.chain(stream);
173
174 let (xml_candidate, events) = read_xmls(log_entry_regex, xml_candidate);
175
176 if let Ok(events) = events {
177 let mut writer = Writer::new(Cursor::new(Vec::new()));
178 for event in events {
179 if let Err(err) = writer.write_event(event) {
180 return Some(Err(err.into()));
181 }
182 }
183 let buf = writer.into_inner().into_inner();
184 let xml = String::from_utf8_lossy(&buf);
185 result = Some(Ok(format!("{xml}")));
186 }
187
188 let (cursor, remaining_input) = xml_candidate.into_inner();
189
190 stream = remaining_input;
191
192 let cursor_position = cursor.position() as usize;
193
194 if cursor_position < head.len() {
195 head = &head[cursor_position..];
196 if result.is_some() {
197 *total_pos += cursor_position;
198 if *total_pos > head_range.end {
199 *total_pos = head_range.end;
200 }
202 *head_range = *total_pos..head_range.end;
203 *input = Some(stream);
204 return result;
205 }
206 } else {
207 break;
208 }
209 }
210 if let Ok(count) = stream.read(buffer.as_mut()) {
211 if count == 0 {
212 break;
213 }
214 if result.is_none() {
215 head = &buffer[..count];
216 *total_pos = 0;
217 } else {
218 *head_range = 0..count;
219 *total_pos = 0;
220 *input = Some(stream);
221 break;
222 }
223 } else {
224 *input = Some(stream);
225 break;
226 }
227 }
228 result
229 }
230}
231
232fn read_xmls<I: BufRead>(
233 log_entry_regex: &mut Option<Regex>,
234 xml_candidate: I,
235) -> (I, Result<Vec<Event<'static>>, ()>) {
236 let mut reader = Reader::from_reader(xml_candidate);
237
238 let mut buf = Vec::new();
239
240 let events = match reader.read_event_into(&mut buf) {
241 Ok(Event::Start(ref b)) => {
242 let (start, end) = (b.clone().into_owned(), b.to_end().into_owned());
243
244 let end = end.name();
245
246 let mut depth = 0;
247 let mut events = vec![Event::Start(start)];
248
249 loop {
250 let evt = reader.read_event_into(&mut buf);
251
252 if let Ok(e) = &evt {
253 events.push(e.clone().into_owned());
254 }
255
256 match evt {
257 Ok(Event::Start(ref e)) if e.name() == end => depth += 1,
258 Ok(Event::End(ref e)) if e.name() == end => {
259 if depth == 0 {
260 break Ok(events);
261 }
262 depth -= 1;
263 }
264 Ok(Event::Text(e)) => {
265 if let Some(log_entry_regex) = log_entry_regex.as_ref() {
266 if log_entry_regex.is_match(&e) {
267 break Err(());
268 }
269 }
270 }
271 Ok(Event::Eof) | Err(_) => break Err(()),
272 _ => (),
273 }
274 }
275 }
276 Ok(e @ Event::Empty(_)) => Ok(vec![e.clone().into_owned()]),
277 _ => Err(()),
278 };
279 (reader.into_inner(), events)
280}
281
282#[cfg(test)]
283mod tests {
284 use std::io::BufReader;
285
286 use crate::XmlExtractor;
287
288 #[test]
289 fn test_xml_extractor_iter() {
290 let xml = include_bytes!("../fixtures/example.log").as_slice();
291 let extractor = XmlExtractor::<_, 1024>::new(BufReader::new(xml), None);
292 let mut iter = extractor.into_iter();
293
294 assert_eq!(
295 iter.next().transpose().unwrap(),
296 Some("<hello>\n <world/>\n</hello>".to_string())
297 );
298 assert_eq!(
299 iter.next().transpose().unwrap(),
300 Some(r#"<simple qqq="aaa"/>"#.to_string())
301 );
302 assert_eq!(
303 iter.next().transpose().unwrap(),
304 Some("<another></another>".to_string())
305 );
306 assert_eq!(iter.next().transpose().unwrap(), None);
307 }
308
309 #[test]
310 fn test_xml_extractor_iter2() {
311 let xml = b"qqq <hello/> <world/>\nqqq <next><child/></next>".as_slice();
312 let extractor = XmlExtractor::<_, 1024>::new(BufReader::new(xml), None);
313 let mut iter = extractor.into_iter().flatten();
314
315 assert_eq!(iter.next(), Some("<hello/>".to_string()));
316 assert_eq!(iter.next(), Some("<world/>".to_string()));
317 assert_eq!(iter.next(), Some("<next><child/></next>".to_string()));
318 assert_eq!(iter.next(), None);
319 }
320}