1#![doc = include_str!("../README.md")]
2#![warn(
3 missing_docs,
4 missing_debug_implementations,
5 missing_copy_implementations,
6 trivial_casts,
7 trivial_numeric_casts,
8 unused_extern_crates,
9 unused_import_braces,
10 unused_qualifications,
11 variant_size_differences
12)]
13
14use std::{
15 io::{BufRead, Cursor, Read},
16 ops::Range,
17};
18
19use quick_xml::{Reader, Writer, events::Event};
20use regex::bytes::Regex;
21use thiserror::Error;
22
23pub fn filter_xmls(input: impl BufRead, xpath: Option<&str>) {
25 let xml_extractor = XmlExtractor::<_, 1024>::new(input, None).into_iter();
26 if let Some(xpath) = xpath {
27 for entry in xml_extractor.map(|xml| {
28 xml.map(|xml| {
29 use amxml::dom::*;
30 let doc = new_document(&xml).expect("well formed XML");
31 let root = doc.root_element();
32 let result = root.eval_xpath(xpath).expect("XPath expression");
33
34 (0..result.len())
35 .map(|i| result.get_item(i))
36 .map(|item| {
37 if let Some(node) = item.as_nodeptr() {
38 node.to_string()
39 } else {
40 item.to_string()
41 }
42 })
43 .collect::<Vec<_>>()
44 .join("\n")
45 })
46 }) {
47 println!("{}", entry.expect("next xml"));
48 }
49 } else {
50 for entry in xml_extractor {
51 println!("{}", entry.expect("next xml"));
52 }
53 }
54}
55
56#[cfg_attr(feature = "debug", derive(Debug))]
58pub struct XmlExtractor<I: BufRead, const B: usize = 1024> {
59 input: Option<I>,
60 buffer: [u8; B],
61 log_entry_regex: Option<Regex>,
62}
63
64impl<I: BufRead, const B: usize> XmlExtractor<I, B> {
65 pub fn new(input: I, log_entry_regex: Option<Regex>) -> XmlExtractor<I, B> {
67 XmlExtractor {
68 input: Some(input),
69 buffer: [0; B],
70 log_entry_regex,
71 }
72 }
73}
74
75impl<I: BufRead, const B: usize> IntoIterator for XmlExtractor<I, B> {
76 type Item = Result<String, XmlExtractorError>;
77 type IntoIter = XmlExtractorIter<I, B>;
78
79 fn into_iter(self) -> Self::IntoIter {
80 const LOG_ENTRY_DATE_MIN_LEN: usize = 19;
81 let Self {
82 mut input,
83 mut buffer,
84 log_entry_regex,
85 } = self;
86 let Some(mut input) = input.take() else {
87 panic!("Input stream is empty");
88 };
89 let mut total_count = 0;
90 let mut eof = false;
91 while let Ok(count) = input.read(&mut buffer[total_count..]) {
92 if count == 0 {
93 eof = true;
94 break;
95 }
96 total_count += count;
97 if total_count > LOG_ENTRY_DATE_MIN_LEN {
98 break;
99 }
100 }
101 let log_entry_regex = log_entry_regex.or_else(|| {
102 const LOG_ENTRY_DATE_REGEX: &str = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}";
103 const LOG_NEW_ENTRY_DATE_REGEX: &str = r"\n\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}";
104 const LOG_ENTRY_NON_WS_REGEX: &str = r"^\S";
105 const LOG_NEW_ENTRY_NON_WS_REGEX: &str = r"\n\S";
106
107 if total_count > LOG_ENTRY_DATE_MIN_LEN
108 && Regex::new(LOG_ENTRY_DATE_REGEX)
109 .expect("valid regex")
110 .is_match(&buffer[..total_count])
111 {
112 Some(Regex::new(LOG_NEW_ENTRY_DATE_REGEX).expect("valid regex"))
113 } else if Regex::new(LOG_ENTRY_NON_WS_REGEX)
114 .expect("valid regex")
115 .is_match(&buffer[..total_count])
116 {
117 Some(Regex::new(LOG_NEW_ENTRY_NON_WS_REGEX).expect("valid regex"))
118 } else {
119 None
120 }
121 });
122
123 Self::IntoIter {
124 input: if eof { None } else { Some(input) },
125 buffer,
126 total_pos: 0,
127 head_range: 0..total_count,
128 log_entry_regex,
129 }
130 }
131}
132
133#[derive(Debug, Error)]
135pub enum XmlExtractorError {
136 #[error("I/O error")]
138 Io(#[from] std::io::Error),
139 #[error("Regex error")]
141 Regex(#[from] regex::Error),
142}
143
144#[cfg_attr(feature = "debug", derive(Debug))]
146pub struct XmlExtractorIter<I: BufRead, const B: usize> {
147 input: Option<I>,
148 buffer: [u8; B],
149 total_pos: usize,
150 log_entry_regex: Option<Regex>,
151 head_range: Range<usize>,
152}
153
154impl<I: BufRead, const B: usize> Iterator for XmlExtractorIter<I, B> {
155 type Item = Result<String, XmlExtractorError>;
156
157 fn next(&mut self) -> Option<Self::Item> {
158 let &mut Self {
159 ref mut log_entry_regex,
160 ref mut head_range,
161 ref mut total_pos,
162 ref mut input,
163 ref mut buffer,
164 } = self;
165
166 let mut stream = input.take()?;
167
168 let mut head = &buffer[head_range.clone()];
169
170 let mut result = None;
171
172 loop {
173 while let Some(pos) = head.iter().position(|&n| n == b'<') {
174 head = &head[pos..];
175 *total_pos += pos;
176 let cursor = Cursor::new(head);
177
178 let xml_candidate = cursor.chain(stream);
179
180 let (xml_candidate, events) = read_xmls(log_entry_regex, xml_candidate);
181
182 if let Ok(events) = events {
183 let mut writer = Writer::new(Cursor::new(Vec::new()));
184 for event in events {
185 if let Err(err) = writer.write_event(event) {
186 return Some(Err(err.into()));
187 }
188 }
189 let buf = writer.into_inner().into_inner();
190 let xml = String::from_utf8_lossy(&buf);
191 result = Some(Ok(format!("{xml}")));
192 }
193
194 let (cursor, remaining_input) = xml_candidate.into_inner();
195
196 stream = remaining_input;
197
198 let cursor_position = cursor.position() as usize;
199
200 if cursor_position < head.len() {
201 head = &head[cursor_position..];
202 if result.is_some() {
203 *total_pos += cursor_position;
204 if *total_pos > head_range.end {
205 *total_pos = head_range.end;
206 }
208 *head_range = *total_pos..head_range.end;
209 *input = Some(stream);
210 return result;
211 }
212 } else {
213 break;
214 }
215 }
216 if let Ok(count) = stream.read(buffer.as_mut()) {
217 if count == 0 {
218 break;
219 }
220 if result.is_none() {
221 head = &buffer[..count];
222 *total_pos = 0;
223 } else {
224 *head_range = 0..count;
225 *total_pos = 0;
226 *input = Some(stream);
227 break;
228 }
229 } else {
230 *input = Some(stream);
231 break;
232 }
233 }
234 result
235 }
236}
237
238fn read_xmls<I: BufRead>(
239 log_entry_regex: &mut Option<Regex>,
240 xml_candidate: I,
241) -> (I, Result<Vec<Event<'static>>, ()>) {
242 let mut reader = Reader::from_reader(xml_candidate);
243
244 let mut buf = Vec::new();
245
246 let events = match reader.read_event_into(&mut buf) {
247 Ok(Event::Start(ref b)) => {
248 let (start, end) = (b.clone().into_owned(), b.to_end().into_owned());
249
250 let end = end.name();
251
252 let mut depth = 0;
253 let mut events = vec![Event::Start(start)];
254
255 loop {
256 let evt = reader.read_event_into(&mut buf);
257
258 if let Ok(e) = &evt {
259 events.push(e.clone().into_owned());
260 }
261
262 match evt {
263 Ok(Event::Start(ref e)) if e.name() == end => depth += 1,
264 Ok(Event::End(ref e)) if e.name() == end => {
265 if depth == 0 {
266 break Ok(events);
267 }
268 depth -= 1;
269 }
270 Ok(Event::Text(e)) => {
271 if let Some(log_entry_regex) = log_entry_regex.as_ref() {
272 if log_entry_regex.is_match(&e) {
273 break Err(());
274 }
275 }
276 }
277 Ok(Event::Eof) | Err(_) => break Err(()),
278 _ => (),
279 }
280 }
281 }
282 Ok(e @ Event::Empty(_)) => Ok(vec![e.clone().into_owned()]),
283 _ => Err(()),
284 };
285 (reader.into_inner(), events)
286}
287
288#[cfg(test)]
289mod tests {
290 use std::io::BufReader;
291
292 use crate::XmlExtractor;
293
294 #[test]
295 fn test_xml_extractor_iter() {
296 let xml = include_bytes!("../fixtures/example.log").as_slice();
297 let extractor = XmlExtractor::<_, 1024>::new(BufReader::new(xml), None);
298 let mut iter = extractor.into_iter();
299
300 assert_eq!(
301 iter.next().transpose().unwrap(),
302 Some("<hello>\n <world/>\n</hello>".to_string())
303 );
304 assert_eq!(
305 iter.next().transpose().unwrap(),
306 Some(r#"<simple qqq="aaa"/>"#.to_string())
307 );
308 assert_eq!(
309 iter.next().transpose().unwrap(),
310 Some("<another></another>".to_string())
311 );
312 assert_eq!(iter.next().transpose().unwrap(), None);
313 }
314
315 #[test]
316 fn test_xml_extractor_iter2() {
317 let xml = b"qqq <hello/> <world/>\nqqq <next><child/></next>".as_slice();
318 let extractor = XmlExtractor::<_, 1024>::new(BufReader::new(xml), None);
319 let mut iter = extractor.into_iter().flatten();
320
321 assert_eq!(iter.next(), Some("<hello/>".to_string()));
322 assert_eq!(iter.next(), Some("<world/>".to_string()));
323 assert_eq!(iter.next(), Some("<next><child/></next>".to_string()));
324 assert_eq!(iter.next(), None);
325 }
326}