use std::collections::HashMap;
use std::io::{BufRead, Write};
use crate::xpath::XPathSource;
use super::builder::{Handler, HandlerCallback};
use super::editable::EditableNode;
use super::error::{TransformError, TransformResult};
use super::streaming;
use super::xpath_analyze::{self, StreamableXPath, XPathAnalysis};
pub struct StreamTransformerReader<'a, R: BufRead> {
reader: R,
handlers: Vec<Handler<'a>>,
namespaces: HashMap<String, String>,
}
impl<'a, R: BufRead> StreamTransformerReader<'a, R> {
pub fn new(reader: R) -> Self {
Self {
reader,
handlers: Vec::new(),
namespaces: HashMap::new(),
}
}
pub fn on<F>(mut self, xpath: &str, callback: F) -> Self
where
F: FnMut(&mut EditableNode) + 'a,
{
self.handlers.push(Handler {
xpath: XPathSource::String(xpath.to_string()),
callback: HandlerCallback::Simple(Box::new(callback)),
});
self
}
pub fn namespace(mut self, prefix: &str, uri: &str) -> Self {
self.namespaces.insert(prefix.to_string(), uri.to_string());
self
}
pub fn namespaces<I, S1, S2>(mut self, iter: I) -> Self
where
I: IntoIterator<Item = (S1, S2)>,
S1: AsRef<str>,
S2: AsRef<str>,
{
for (prefix, uri) in iter {
self.namespaces
.insert(prefix.as_ref().to_string(), uri.as_ref().to_string());
}
self
}
pub fn run_to_writer<W: Write>(self, writer: &mut W) -> TransformResult<usize> {
if self.handlers.is_empty() {
return Err(TransformError::InvalidXPath(
"No handlers registered. Use .on() to add handlers.".to_string(),
));
}
self.execute_transform_reader(writer)
}
pub fn for_each(self) -> TransformResult<()> {
if self.handlers.is_empty() {
return Err(TransformError::InvalidXPath(
"No handlers registered. Use .on() to add handlers.".to_string(),
));
}
self.execute_for_each_reader()?;
Ok(())
}
fn execute_transform_reader<W: Write>(mut self, writer: &mut W) -> TransformResult<usize> {
let mut analyses: Vec<XPathAnalysis> = Vec::with_capacity(self.handlers.len());
for handler in &self.handlers {
let expr = handler.xpath.parse()?;
analyses.push(xpath_analyze::analyze_xpath(&expr));
}
let mut streamable_xpaths: Vec<StreamableXPath> = Vec::with_capacity(analyses.len());
for (i, analysis) in analyses.into_iter().enumerate() {
match analysis {
XPathAnalysis::Streamable(s) => streamable_xpaths.push(s),
XPathAnalysis::NotStreamable(reason) => {
let xpath_str = self.handlers[i]
.xpath
.as_string()
.map(|s| s.to_string())
.unwrap_or_else(|| "<ast>".to_string());
return Err(TransformError::NotStreamable {
xpath: xpath_str,
reason,
});
}
}
}
if self.handlers.len() == 1 {
let handler = self.handlers.remove(0);
match handler.callback {
HandlerCallback::Simple(mut f) => {
return streaming::process_streaming_from_reader(
self.reader,
&streamable_xpaths[0],
&self.namespaces,
|node| f(node),
writer,
);
}
HandlerCallback::WithContext(_) => {
return Err(TransformError::InvalidXPath(
"WithContext callbacks are not supported in reader mode".to_string(),
));
}
}
}
type Callback<'a> = Box<dyn FnMut(&mut EditableNode) + 'a>;
let mut callbacks: Vec<Callback<'_>> = Vec::with_capacity(self.handlers.len());
for handler in self.handlers.iter_mut() {
if let HandlerCallback::Simple(f) = &mut handler.callback {
callbacks.push(Box::new(move |node: &mut EditableNode| f(node)));
} else {
return Err(TransformError::InvalidXPath(
"WithContext callbacks are not supported in reader mode".to_string(),
));
}
}
let mut handler_pairs: Vec<streaming::MultiTransformHandler<'_>> = streamable_xpaths
.iter()
.zip(callbacks.iter_mut())
.map(|(xpath, cb)| (xpath, cb.as_mut() as &mut dyn FnMut(&mut EditableNode)))
.collect();
streaming::process_streaming_multi_from_reader(
self.reader,
&mut handler_pairs,
&self.namespaces,
writer,
)
}
fn execute_for_each_reader(mut self) -> TransformResult<usize> {
let mut analyses: Vec<XPathAnalysis> = Vec::with_capacity(self.handlers.len());
for handler in &self.handlers {
let expr = handler.xpath.parse()?;
analyses.push(xpath_analyze::analyze_xpath(&expr));
}
let mut streamable_xpaths: Vec<StreamableXPath> = Vec::with_capacity(analyses.len());
for (i, analysis) in analyses.into_iter().enumerate() {
match analysis {
XPathAnalysis::Streamable(s) => streamable_xpaths.push(s),
XPathAnalysis::NotStreamable(reason) => {
let xpath_str = self.handlers[i]
.xpath
.as_string()
.map(|s| s.to_string())
.unwrap_or_else(|| "<ast>".to_string());
return Err(TransformError::NotStreamable {
xpath: xpath_str,
reason,
});
}
}
}
if self.handlers.len() == 1 {
let handler = self.handlers.remove(0);
match handler.callback {
HandlerCallback::Simple(mut f) => {
return streaming::process_for_each_from_reader(
self.reader,
&streamable_xpaths[0],
&self.namespaces,
|node| f(node),
);
}
HandlerCallback::WithContext(_) => {
return Err(TransformError::InvalidXPath(
"WithContext callbacks are not supported in reader mode".to_string(),
));
}
}
}
type Callback<'a> = Box<dyn FnMut(&mut EditableNode) + 'a>;
let mut callbacks: Vec<Callback<'_>> = Vec::with_capacity(self.handlers.len());
for handler in self.handlers.iter_mut() {
if let HandlerCallback::Simple(f) = &mut handler.callback {
callbacks.push(Box::new(move |node: &mut EditableNode| f(node)));
} else {
return Err(TransformError::InvalidXPath(
"WithContext callbacks are not supported in reader mode".to_string(),
));
}
}
let mut handler_pairs: Vec<streaming::MultiHandler<'_>> = streamable_xpaths
.iter()
.zip(callbacks.iter_mut())
.map(|(xpath, cb)| (xpath, cb.as_mut() as &mut dyn FnMut(&mut EditableNode)))
.collect();
streaming::process_for_each_multi_from_reader(
self.reader,
&mut handler_pairs,
&self.namespaces,
)
}
}