use crate::streaming::{StreamConfig, StreamItem, StreamPosition};
use crate::{from_xml, to_xml, FromXmlConfig, ToXmlConfig};
use hedl_core::Document;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub async fn from_xml_file_async(
path: impl AsRef<std::path::Path>,
config: &FromXmlConfig,
) -> Result<Document, String> {
let contents = tokio::fs::read_to_string(path)
.await
.map_err(|e| format!("Failed to read file: {}", e))?;
from_xml(&contents, config)
}
pub async fn to_xml_file_async(
doc: &Document,
path: impl AsRef<std::path::Path>,
config: &ToXmlConfig,
) -> Result<(), String> {
let xml = to_xml(doc, config)?;
tokio::fs::write(path, xml)
.await
.map_err(|e| format!("Failed to write file: {}", e))?;
Ok(())
}
pub async fn from_xml_reader_async<R: AsyncRead + Unpin>(
mut reader: R,
config: &FromXmlConfig,
) -> Result<Document, String> {
let mut contents = String::new();
reader
.read_to_string(&mut contents)
.await
.map_err(|e| format!("Failed to read XML: {}", e))?;
from_xml(&contents, config)
}
pub async fn to_xml_writer_async<W: AsyncWrite + Unpin>(
doc: &Document,
mut writer: W,
config: &ToXmlConfig,
) -> Result<(), String> {
let xml = to_xml(doc, config)?;
writer
.write_all(xml.as_bytes())
.await
.map_err(|e| format!("Failed to write XML: {}", e))?;
writer
.flush()
.await
.map_err(|e| format!("Failed to flush writer: {}", e))?;
Ok(())
}
pub async fn from_xml_stream_async<R: AsyncRead + Unpin + Send + 'static>(
reader: R,
config: &StreamConfig,
) -> Result<AsyncXmlStream<R>, String> {
AsyncXmlStream::new(reader, config.clone())
}
pub struct AsyncXmlStream<R: AsyncRead + Unpin> {
reader: R,
config: StreamConfig,
buffer: Vec<u8>,
byte_position: usize,
chunk_size: usize,
parsed_items: std::collections::VecDeque<Result<StreamItem, String>>,
fully_parsed: bool,
stream_position: StreamPosition,
}
impl<R: AsyncRead + Unpin> AsyncXmlStream<R> {
pub fn new(reader: R, config: StreamConfig) -> Result<Self, String> {
let chunk_size = config.buffer_size;
Ok(AsyncXmlStream {
reader,
config,
buffer: Vec::new(),
byte_position: 0,
chunk_size,
parsed_items: std::collections::VecDeque::new(),
fully_parsed: false,
stream_position: StreamPosition::default(),
})
}
#[inline]
pub fn position(&self) -> StreamPosition {
StreamPosition {
byte_offset: self.byte_position as u64,
items_parsed: self.stream_position.items_parsed,
}
}
async fn read_chunk(&mut self) -> Result<usize, String> {
let mut chunk = vec![0u8; self.chunk_size];
let n = self
.reader
.read(&mut chunk)
.await
.map_err(|e| format!("Failed to read chunk: {}", e))?;
if n > 0 {
self.buffer.extend_from_slice(&chunk[..n]);
self.byte_position += n;
}
Ok(n)
}
async fn ensure_parsed(&mut self) -> Result<(), String> {
if self.fully_parsed {
return Ok(());
}
loop {
match self.read_chunk().await {
Ok(0) => break, Ok(_) => continue,
Err(e) => return Err(e),
}
}
if !self.buffer.is_empty() {
use crate::streaming::from_xml_stream;
use std::io::Cursor;
let cursor = Cursor::new(&self.buffer);
match from_xml_stream(cursor, &self.config) {
Ok(parser) => {
for result in parser {
self.parsed_items
.push_back(result.map_err(|e| e.to_string()));
}
}
Err(e) => {
self.parsed_items.push_back(Err(e));
}
}
}
self.fully_parsed = true;
Ok(())
}
pub async fn next(&mut self) -> Option<Result<StreamItem, String>> {
if let Err(e) = self.ensure_parsed().await {
return Some(Err(e));
}
let result = self.parsed_items.pop_front();
if result.as_ref().is_some_and(|r| r.is_ok()) {
self.stream_position.items_parsed += 1;
}
result
}
}
pub async fn from_xml_async(xml: &str, config: &FromXmlConfig) -> Result<Document, String> {
let xml = xml.to_string();
let config = config.clone();
tokio::task::spawn_blocking(move || from_xml(&xml, &config))
.await
.map_err(|e| format!("Task join error: {}", e))?
}
pub async fn to_xml_async(doc: &Document, config: &ToXmlConfig) -> Result<String, String> {
let doc = doc.clone();
let config = config.clone();
tokio::task::spawn_blocking(move || to_xml(&doc, &config))
.await
.map_err(|e| format!("Task join error: {}", e))?
}
pub async fn from_xml_files_concurrent<'a, I, P>(
paths: I,
config: &FromXmlConfig,
concurrency: usize,
) -> Vec<Result<Document, String>>
where
I: IntoIterator<Item = P>,
P: AsRef<std::path::Path> + Send + 'a,
{
use tokio::task::JoinSet;
let mut set: JoinSet<(usize, Result<Document, String>)> = JoinSet::new();
let config = config.clone();
let paths_vec: Vec<_> = paths.into_iter().collect();
let total = paths_vec.len();
let mut results: Vec<Option<Result<Document, String>>> = vec![None; total];
let mut paths_iter = paths_vec.into_iter().enumerate();
let mut pending = 0;
for _ in 0..concurrency {
if let Some((idx, path)) = paths_iter.next() {
let path = path.as_ref().to_path_buf();
let config = config.clone();
set.spawn(async move { (idx, from_xml_file_async(&path, &config).await) });
pending += 1;
} else {
break;
}
}
while pending > 0 {
if let Some(join_result) = set.join_next().await {
match join_result {
Ok((idx, doc_result)) => {
results[idx] = Some(doc_result);
}
Err(e) => {
if let Some(slot) = results.iter_mut().find(|r| r.is_none()) {
*slot = Some(Err(format!("Task error: {}", e)));
}
}
}
pending -= 1;
if let Some((idx, path)) = paths_iter.next() {
let path = path.as_ref().to_path_buf();
let config = config.clone();
set.spawn(async move { (idx, from_xml_file_async(&path, &config).await) });
pending += 1;
}
}
}
results
.into_iter()
.map(|opt| opt.unwrap_or_else(|| Err("Missing result".to_string())))
.collect()
}
pub async fn to_xml_files_concurrent<'a, I, P>(
docs_and_paths: I,
config: &ToXmlConfig,
concurrency: usize,
) -> Vec<Result<(), String>>
where
I: IntoIterator<Item = (&'a Document, P)>,
P: AsRef<std::path::Path> + Send + 'a,
{
use tokio::task::JoinSet;
let mut set: JoinSet<(usize, Result<(), String>)> = JoinSet::new();
let config = config.clone();
let docs_and_paths_vec: Vec<_> = docs_and_paths.into_iter().collect();
let total = docs_and_paths_vec.len();
let mut results: Vec<Option<Result<(), String>>> = vec![None; total];
let mut iter = docs_and_paths_vec.into_iter().enumerate();
let mut pending = 0;
for _ in 0..concurrency {
if let Some((idx, (doc, path))) = iter.next() {
let doc = doc.clone();
let path = path.as_ref().to_path_buf();
let config = config.clone();
set.spawn(async move { (idx, to_xml_file_async(&doc, &path, &config).await) });
pending += 1;
} else {
break;
}
}
while pending > 0 {
if let Some(join_result) = set.join_next().await {
match join_result {
Ok((idx, write_result)) => {
results[idx] = Some(write_result);
}
Err(e) => {
if let Some(slot) = results.iter_mut().find(|r| r.is_none()) {
*slot = Some(Err(format!("Task error: {}", e)));
}
}
}
pending -= 1;
if let Some((idx, (doc, path))) = iter.next() {
let doc = doc.clone();
let path = path.as_ref().to_path_buf();
let config = config.clone();
set.spawn(async move { (idx, to_xml_file_async(&doc, &path, &config).await) });
pending += 1;
}
}
}
results
.into_iter()
.map(|opt| opt.unwrap_or_else(|| Err("Missing result".to_string())))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use hedl_core::{Item, Value};
use std::io::Cursor;
#[tokio::test]
async fn test_from_xml_file_async_not_found() {
let config = FromXmlConfig::default();
let result = from_xml_file_async("/nonexistent/file.xml", &config).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Failed to read file"));
}
#[tokio::test]
async fn test_to_xml_file_async_invalid_path() {
let doc = Document::new((2, 0));
let config = ToXmlConfig::default();
let result = to_xml_file_async(&doc, "/invalid/\0/path.xml", &config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_from_xml_reader_async_valid() {
let xml = r#"<?xml version="1.0"?><hedl><val>42</val></hedl>"#;
let cursor = Cursor::new(xml.as_bytes());
let config = FromXmlConfig::default();
let doc = from_xml_reader_async(cursor, &config).await.unwrap();
assert_eq!(
doc.root.get("val").and_then(|i| i.as_scalar()),
Some(&Value::Int(42))
);
}
#[tokio::test]
async fn test_from_xml_reader_async_empty() {
let xml = "";
let cursor = Cursor::new(xml.as_bytes());
let config = FromXmlConfig::default();
let doc = from_xml_reader_async(cursor, &config).await.unwrap();
assert!(doc.root.is_empty());
}
#[tokio::test]
async fn test_to_xml_writer_async_valid() {
let mut doc = Document::new((2, 0));
doc.root
.insert("val".to_string(), Item::Scalar(Value::Int(42)));
let mut buffer = Vec::new();
let config = ToXmlConfig::default();
to_xml_writer_async(&doc, &mut buffer, &config)
.await
.unwrap();
let xml = String::from_utf8(buffer).unwrap();
assert!(xml.contains("<val>42</val>"));
}
#[tokio::test]
async fn test_to_xml_writer_async_empty() {
let doc = Document::new((2, 0));
let mut buffer = Vec::new();
let config = ToXmlConfig::default();
to_xml_writer_async(&doc, &mut buffer, &config)
.await
.unwrap();
let xml = String::from_utf8(buffer).unwrap();
assert!(xml.contains("<?xml"));
assert!(xml.contains("<hedl"));
}
#[tokio::test]
async fn test_from_xml_async_valid() {
let xml = r#"<?xml version="1.0"?><hedl><name>test</name></hedl>"#;
let config = FromXmlConfig::default();
let doc = from_xml_async(xml, &config).await.unwrap();
assert_eq!(
doc.root.get("name").and_then(|i| i.as_scalar()),
Some(&Value::String("test".to_string().into()))
);
}
#[tokio::test]
async fn test_from_xml_async_invalid() {
let xml = r#"</invalid>"#;
let config = FromXmlConfig::default();
let result = from_xml_async(xml, &config).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_to_xml_async_valid() {
let mut doc = Document::new((2, 0));
doc.root
.insert("val".to_string(), Item::Scalar(Value::Int(123)));
let config = ToXmlConfig::default();
let xml = to_xml_async(&doc, &config).await.unwrap();
assert!(xml.contains("<val>123</val>"));
}
#[tokio::test]
async fn test_to_xml_async_empty() {
let doc = Document::new((2, 0));
let config = ToXmlConfig::default();
let xml = to_xml_async(&doc, &config).await.unwrap();
assert!(xml.contains("<?xml"));
}
#[tokio::test]
async fn test_from_xml_files_concurrent_empty() {
let paths: Vec<&str> = vec![];
let config = FromXmlConfig::default();
let results = from_xml_files_concurrent(&paths, &config, 4).await;
assert!(results.is_empty());
}
#[tokio::test]
async fn test_to_xml_files_concurrent_empty() {
let docs_and_paths: Vec<(&Document, &str)> = vec![];
let config = ToXmlConfig::default();
let results = to_xml_files_concurrent(docs_and_paths, &config, 4).await;
assert!(results.is_empty());
}
#[tokio::test]
async fn test_from_xml_reader_async_unicode() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<hedl><name>héllo 世界</name></hedl>"#;
let cursor = Cursor::new(xml.as_bytes());
let config = FromXmlConfig::default();
let doc = from_xml_reader_async(cursor, &config).await.unwrap();
assert_eq!(
doc.root.get("name").and_then(|i| i.as_scalar()),
Some(&Value::String("héllo 世界".to_string().into()))
);
}
#[tokio::test]
async fn test_from_xml_reader_async_large_value() {
let large_string = "x".repeat(10000);
let xml = format!(
r#"<?xml version="1.0"?><hedl><val>{}</val></hedl>"#,
large_string
);
let cursor = Cursor::new(xml.as_bytes());
let config = FromXmlConfig::default();
let doc = from_xml_reader_async(cursor, &config).await.unwrap();
assert_eq!(
doc.root.get("val").and_then(|i| i.as_scalar()),
Some(&Value::String(large_string.into()))
);
}
#[tokio::test]
async fn test_round_trip_async() {
let mut doc = Document::new((2, 0));
doc.root
.insert("bool_val".to_string(), Item::Scalar(Value::Bool(true)));
doc.root
.insert("int_val".to_string(), Item::Scalar(Value::Int(42)));
doc.root.insert(
"string_val".to_string(),
Item::Scalar(Value::String("hello".to_string().into())),
);
let config_to = ToXmlConfig::default();
let xml = to_xml_async(&doc, &config_to).await.unwrap();
let config_from = FromXmlConfig::default();
let doc2 = from_xml_async(&xml, &config_from).await.unwrap();
assert_eq!(
doc2.root.get("bool_val").and_then(|i| i.as_scalar()),
Some(&Value::Bool(true))
);
assert_eq!(
doc2.root.get("int_val").and_then(|i| i.as_scalar()),
Some(&Value::Int(42))
);
assert_eq!(
doc2.root.get("string_val").and_then(|i| i.as_scalar()),
Some(&Value::String("hello".to_string().into()))
);
}
#[tokio::test]
async fn test_concurrent_parsing() {
let xml1 = r#"<?xml version="1.0"?><hedl><id>1</id></hedl>"#;
let xml2 = r#"<?xml version="1.0"?><hedl><id>2</id></hedl>"#;
let xml3 = r#"<?xml version="1.0"?><hedl><id>3</id></hedl>"#;
let config = FromXmlConfig::default();
let (r1, r2, r3) = tokio::join!(
from_xml_async(xml1, &config),
from_xml_async(xml2, &config),
from_xml_async(xml3, &config)
);
assert!(r1.is_ok());
assert!(r2.is_ok());
assert!(r3.is_ok());
assert_eq!(
r1.unwrap().root.get("id").and_then(|i| i.as_scalar()),
Some(&Value::Int(1))
);
assert_eq!(
r2.unwrap().root.get("id").and_then(|i| i.as_scalar()),
Some(&Value::Int(2))
);
assert_eq!(
r3.unwrap().root.get("id").and_then(|i| i.as_scalar()),
Some(&Value::Int(3))
);
}
#[tokio::test]
async fn test_concurrent_generation() {
let mut doc1 = Document::new((2, 0));
doc1.root
.insert("id".to_string(), Item::Scalar(Value::Int(1)));
let mut doc2 = Document::new((2, 0));
doc2.root
.insert("id".to_string(), Item::Scalar(Value::Int(2)));
let config = ToXmlConfig::default();
let (r1, r2) = tokio::join!(to_xml_async(&doc1, &config), to_xml_async(&doc2, &config));
assert!(r1.is_ok());
assert!(r2.is_ok());
assert!(r1.unwrap().contains("<id>1</id>"));
assert!(r2.unwrap().contains("<id>2</id>"));
}
}