Skip to main content

simdxml/batch/
mod.rs

1//! Columnar batch XPath evaluation.
2//!
3//! Evaluate an XPath expression against a batch of XML documents, returning
4//! results grouped by document. Amortizes XPath compilation and integrates
5//! bloom filtering and lazy parsing for maximum throughput.
6//!
7//! Results are returned as owned `String`s since the per-document indices
8//! are temporary.
9
10use crate::error::Result;
11use crate::xpath::CompiledXPath;
12
13/// Evaluate an XPath expression against a batch of documents, returning text results.
14///
15/// Each document is parsed independently with the compiled XPath evaluated against it.
16/// The XPath expression is compiled once and reused across all documents.
17pub fn eval_batch_text(
18    docs: &[&[u8]],
19    xpath: &CompiledXPath,
20) -> Result<Vec<Vec<String>>> {
21    let mut all_results = Vec::with_capacity(docs.len());
22
23    for &doc in docs {
24        let index = crate::parse(doc)?;
25        // Name index skipped: single-query-per-doc doesn't benefit from posting lists.
26        // The XPath evaluator falls back to linear tag_name_eq scan.
27        let texts: Vec<String> = xpath.eval_text(&index)?
28            .into_iter().map(|s| s.to_string()).collect();
29        all_results.push(texts);
30    }
31
32    Ok(all_results)
33}
34
35/// Evaluate with lazy parsing: only index tags relevant to the XPath query.
36pub fn eval_batch_text_lazy(
37    docs: &[&[u8]],
38    xpath: &CompiledXPath,
39) -> Result<Vec<Vec<String>>> {
40    let interesting = xpath.interesting_names();
41    let mut all_results = Vec::with_capacity(docs.len());
42
43    for &doc in docs {
44        let index = match &interesting {
45            Some(names) => crate::index::lazy::parse_for_query(doc, names)?,
46            None => crate::parse(doc)?,
47        };
48        // Name index skipped: single-query-per-doc doesn't benefit from posting lists.
49        // The XPath evaluator falls back to linear tag_name_eq scan.
50        let texts: Vec<String> = xpath.eval_text(&index)?
51            .into_iter().map(|s| s.to_string()).collect();
52        all_results.push(texts);
53    }
54
55    Ok(all_results)
56}
57
58/// Evaluate with bloom filtering + lazy parsing: skip documents that can't match.
59///
60/// For each document, first checks a bloom filter to see if it could possibly
61/// contain the target tag names. Documents that fail the bloom check get an
62/// empty result without any parsing.
63pub fn eval_batch_text_bloom(
64    docs: &[&[u8]],
65    xpath: &CompiledXPath,
66) -> Result<Vec<Vec<String>>> {
67    let interesting = xpath.interesting_names();
68    let target_names: Vec<Vec<u8>> = interesting.as_ref()
69        .map(|names| names.iter().map(|n| n.as_bytes().to_vec()).collect())
70        .unwrap_or_default();
71    let use_bloom = !target_names.is_empty();
72
73    let mut all_results = Vec::with_capacity(docs.len());
74
75    for &doc in docs {
76        // Bloom pre-filter
77        if use_bloom {
78            let bloom = crate::bloom::TagBloom::from_prescan(doc);
79            let refs: Vec<&[u8]> = target_names.iter().map(|n| n.as_slice()).collect();
80            if !bloom.may_contain_any(&refs) {
81                all_results.push(Vec::new());
82                continue;
83            }
84        }
85
86        let index = match &interesting {
87            Some(names) => crate::index::lazy::parse_for_query(doc, names)?,
88            None => crate::parse(doc)?,
89        };
90        // Name index skipped: single-query-per-doc doesn't benefit from posting lists.
91        // The XPath evaluator falls back to linear tag_name_eq scan.
92        let texts: Vec<String> = xpath.eval_text(&index)?
93            .into_iter().map(|s| s.to_string()).collect();
94        all_results.push(texts);
95    }
96
97    Ok(all_results)
98}
99
100/// Count matching nodes per document without extracting text.
101pub fn count_batch(
102    docs: &[&[u8]],
103    xpath: &CompiledXPath,
104) -> Result<Vec<usize>> {
105    let mut counts = Vec::with_capacity(docs.len());
106
107    for &doc in docs {
108        let index = crate::parse(doc)?;
109        // Name index skipped: single-query-per-doc doesn't benefit from posting lists.
110        // The XPath evaluator falls back to linear tag_name_eq scan.
111        let nodes = xpath.eval(&index)?;
112        counts.push(nodes.len());
113    }
114
115    Ok(counts)
116}
117
118/// Size threshold for intra-document parallelism.
119const LARGE_DOC_THRESHOLD: usize = 256 * 1024; // 256 KB
120
121/// Evaluate a batch of documents with automatic parallelism.
122///
123/// Automatically allocates threads between inter-document (parsing different
124/// docs on different threads) and intra-document (splitting large docs across
125/// threads) parallelism based on document sizes.
126///
127/// - Large docs (>256KB): get intra-document parallel parsing
128/// - All docs: processed concurrently across available threads
129/// - Bloom + lazy parsing applied automatically for selective queries
130pub fn eval_batch_parallel(
131    docs: &[&[u8]],
132    xpath: &CompiledXPath,
133    max_threads: usize,
134) -> Result<Vec<Vec<String>>> {
135    if docs.is_empty() {
136        return Ok(Vec::new());
137    }
138
139    let max_threads = max_threads.max(1);
140    let interesting = xpath.interesting_names();
141    let target_names: Vec<Vec<u8>> = interesting.as_ref()
142        .map(|names| names.iter().map(|n| n.as_bytes().to_vec()).collect())
143        .unwrap_or_default();
144    let use_bloom = !target_names.is_empty();
145
146    // Process documents in parallel using thread::scope
147    let results: Vec<Result<Vec<String>>> = std::thread::scope(|scope| {
148        // Determine concurrency: process up to max_threads docs simultaneously
149        // For large docs, each gets intra-document parallelism
150        let doc_concurrency = max_threads.min(docs.len());
151
152        let handles: Vec<_> = docs.iter().enumerate().map(|(_i, &doc)| {
153            let interesting = &interesting;
154            let target_names = &target_names;
155
156            scope.spawn(move || -> Result<Vec<String>> {
157                // Bloom pre-filter
158                if use_bloom {
159                    let bloom = crate::bloom::TagBloom::from_prescan(doc);
160                    let refs: Vec<&[u8]> = target_names.iter().map(|n| n.as_slice()).collect();
161                    if !bloom.may_contain_any(&refs) {
162                        return Ok(Vec::new());
163                    }
164                }
165
166                // Choose parse strategy based on document size
167                let index = if doc.len() >= LARGE_DOC_THRESHOLD {
168                    // Large doc: use intra-document parallelism
169                    // Allocate threads proportionally (at least 2)
170                    let doc_threads = (max_threads / doc_concurrency).max(2);
171                    let mut idx = crate::parallel::parse_parallel(doc, doc_threads)?;
172                    idx.ensure_indices();
173                    idx
174                } else {
175                    // Small doc: single-threaded parse
176                    match interesting {
177                        Some(names) => crate::index::lazy::parse_for_query(doc, names)?,
178                        None => crate::parse(doc)?,
179                    }
180                };
181
182                // Name index skipped: single-query-per-doc doesn't benefit from posting lists.
183        // The XPath evaluator falls back to linear tag_name_eq scan.
184                let texts: Vec<String> = xpath.eval_text(&index)?
185                    .into_iter().map(|s| s.to_string()).collect();
186                Ok(texts)
187            })
188        }).collect();
189
190        handles.into_iter().map(|h| h.join().unwrap()).collect()
191    });
192
193    // Collect results, propagating any errors
194    results.into_iter().collect()
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn batch_of_one() {
203        let doc = b"<root><claim>A device</claim></root>";
204        let xpath = CompiledXPath::compile("//claim").unwrap();
205        let results = eval_batch_text(&[doc.as_slice()], &xpath).unwrap();
206        assert_eq!(results, vec![vec!["A device"]]);
207    }
208
209    #[test]
210    fn batch_multiple_docs() {
211        let docs: Vec<&[u8]> = vec![
212            b"<r><claim>First</claim></r>",
213            b"<r><claim>Second</claim><claim>Third</claim></r>",
214            b"<r><other>No claims</other></r>",
215        ];
216
217        let xpath = CompiledXPath::compile("//claim").unwrap();
218        let results = eval_batch_text(&docs, &xpath).unwrap();
219
220        assert_eq!(results.len(), 3);
221        assert_eq!(results[0], vec!["First"]);
222        assert_eq!(results[1], vec!["Second", "Third"]);
223        assert!(results[2].is_empty());
224    }
225
226    #[test]
227    fn batch_matches_individual() {
228        let docs: Vec<&[u8]> = vec![
229            b"<r><a>1</a><b>2</b></r>",
230            b"<r><a>3</a></r>",
231            b"<r><b>4</b></r>",
232        ];
233
234        let xpath = CompiledXPath::compile("//a").unwrap();
235        let batch = eval_batch_text(&docs, &xpath).unwrap();
236
237        for (i, &doc) in docs.iter().enumerate() {
238            let index = crate::parse(doc).unwrap();
239            let individual: Vec<String> = xpath.eval_text(&index).unwrap()
240                .into_iter().map(|s| s.to_string()).collect();
241            assert_eq!(individual, batch[i], "doc {} mismatch", i);
242        }
243    }
244
245    #[test]
246    fn batch_lazy_matches_full() {
247        let docs: Vec<&[u8]> = vec![
248            b"<r><claim>A</claim><other>skip</other></r>",
249            b"<r><claim>B</claim></r>",
250        ];
251
252        let xpath = CompiledXPath::compile("//claim").unwrap();
253        let full = eval_batch_text(&docs, &xpath).unwrap();
254        let lazy = eval_batch_text_lazy(&docs, &xpath).unwrap();
255        assert_eq!(full, lazy);
256    }
257
258    #[test]
259    fn batch_bloom_skips_irrelevant() {
260        let docs: Vec<&[u8]> = vec![
261            b"<r><claim>A</claim></r>",
262            b"<r><other>no claims</other></r>",
263            b"<r><claim>B</claim></r>",
264        ];
265
266        let xpath = CompiledXPath::compile("//claim").unwrap();
267        let results = eval_batch_text_bloom(&docs, &xpath).unwrap();
268
269        assert_eq!(results.len(), 3);
270        assert_eq!(results[0], vec!["A"]);
271        assert!(results[1].is_empty());
272        assert_eq!(results[2], vec!["B"]);
273    }
274
275    #[test]
276    fn batch_empty() {
277        let docs: Vec<&[u8]> = vec![];
278        let xpath = CompiledXPath::compile("//claim").unwrap();
279        let results = eval_batch_text(&docs, &xpath).unwrap();
280        assert!(results.is_empty());
281    }
282
283    #[test]
284    fn batch_predicate() {
285        let docs: Vec<&[u8]> = vec![
286            br#"<r><claim type="independent">A</claim><claim type="dependent">B</claim></r>"#,
287            br#"<r><claim type="dependent">C</claim></r>"#,
288        ];
289
290        let xpath = CompiledXPath::compile("//claim[@type='independent']").unwrap();
291        let results = eval_batch_text(&docs, &xpath).unwrap();
292        assert_eq!(results[0], vec!["A"]);
293        assert!(results[1].is_empty());
294    }
295
296    #[test]
297    fn count_batch_works() {
298        let docs: Vec<&[u8]> = vec![
299            b"<r><a/><a/><b/></r>",
300            b"<r><a/></r>",
301            b"<r><b/></r>",
302        ];
303
304        let xpath = CompiledXPath::compile("//a").unwrap();
305        let counts = count_batch(&docs, &xpath).unwrap();
306        assert_eq!(counts, vec![2, 1, 0]);
307    }
308
309    #[test]
310    fn batch_bloom_all_match() {
311        let docs: Vec<&[u8]> = vec![
312            b"<r><claim>A</claim></r>",
313            b"<r><claim>B</claim></r>",
314        ];
315
316        let xpath = CompiledXPath::compile("//claim").unwrap();
317        let bloom_results = eval_batch_text_bloom(&docs, &xpath).unwrap();
318        let full_results = eval_batch_text(&docs, &xpath).unwrap();
319        assert_eq!(bloom_results, full_results);
320    }
321
322    #[test]
323    fn parallel_batch_matches_sequential() {
324        let docs: Vec<&[u8]> = vec![
325            b"<r><claim>A</claim><other>skip</other></r>",
326            b"<r><claim>B</claim><claim>C</claim></r>",
327            b"<r><other>no claims</other></r>",
328        ];
329
330        let xpath = CompiledXPath::compile("//claim").unwrap();
331        let seq_results = eval_batch_text(&docs, &xpath).unwrap();
332        let par_results = eval_batch_parallel(&docs, &xpath, 4).unwrap();
333        assert_eq!(seq_results, par_results);
334    }
335
336    #[test]
337    fn parallel_batch_single_thread() {
338        let docs: Vec<&[u8]> = vec![
339            b"<r><a>1</a></r>",
340            b"<r><a>2</a></r>",
341        ];
342        let xpath = CompiledXPath::compile("//a").unwrap();
343        let results = eval_batch_parallel(&docs, &xpath, 1).unwrap();
344        assert_eq!(results, vec![vec!["1"], vec!["2"]]);
345    }
346
347    #[test]
348    fn parallel_batch_empty() {
349        let docs: Vec<&[u8]> = vec![];
350        let xpath = CompiledXPath::compile("//a").unwrap();
351        let results = eval_batch_parallel(&docs, &xpath, 4).unwrap();
352        assert!(results.is_empty());
353    }
354}