use crate::{
interning::StringInterner,
model::{BlankNode, NamedNode, Object, Subject, Term, Triple},
optimization::{SimdXmlProcessor, TermInternerExt, ZeroCopyBuffer},
rdfxml::RdfXmlParseError,
};
use bumpalo::Bump;
#[cfg(feature = "parallel")]
use parking_lot::Mutex as ParkingLotMutex;
use quick_xml::{
escape::unescape,
events::{attributes::Attributes, BytesEnd, BytesStart, BytesText, Event},
Reader as XmlReader,
};
use std::io::BufReader;
#[cfg(not(feature = "parallel"))]
use std::sync::Mutex as ParkingLotMutex;
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::{Duration, Instant},
};
use tokio::sync::mpsc;
pub struct DomFreeStreamingRdfXmlParser {
config: RdfXmlStreamingConfig,
namespace_stack: Vec<NamespaceContext>,
element_stack: Vec<ElementContext>,
term_interner: Arc<StringInterner>,
performance_monitor: Arc<RdfXmlPerformanceMonitor>,
arena: Bump,
#[allow(dead_code)]
buffer_pool: Arc<RdfXmlBufferPool>,
simd_processor: SimdXmlProcessor,
#[allow(dead_code)]
zero_copy_buffer: ZeroCopyBuffer,
}
#[derive(Debug, Clone)]
pub struct RdfXmlStreamingConfig {
pub xml_buffer_size: usize,
pub max_namespace_depth: usize,
pub max_element_depth: usize,
pub enable_zero_copy: bool,
pub enable_parallel_processing: bool,
pub triple_batch_size: usize,
pub arena_size: usize,
pub memory_pressure_threshold: usize,
}
#[derive(Debug, Clone)]
pub struct NamespaceContext {
pub prefixes: HashMap<String, String>,
pub default_namespace: Option<String>,
pub base_uri: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ElementContext {
pub element_type: ElementType,
pub subject: Option<Term>,
pub predicate: Option<NamedNode>,
pub object: Option<Term>,
pub datatype: Option<NamedNode>,
pub language: Option<String>,
pub attributes: HashMap<String, String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ElementType {
RdfRoot,
Description,
Property,
Collection,
ParseType(ParseType),
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ParseType {
Resource,
Collection,
Literal,
}
pub struct RdfXmlPerformanceMonitor {
elements_processed: AtomicUsize,
triples_generated: AtomicUsize,
namespace_lookups: AtomicUsize,
zero_copy_operations: AtomicUsize,
#[allow(dead_code)]
memory_allocations: AtomicUsize,
parse_errors: AtomicUsize,
start_time: Instant,
#[allow(dead_code)]
processing_times: Arc<ParkingLotMutex<VecDeque<Duration>>>,
}
pub struct RdfXmlBufferPool {
#[allow(dead_code)]
xml_buffers: Arc<Mutex<Vec<Vec<u8>>>>,
#[allow(dead_code)]
string_buffers: Arc<Mutex<Vec<String>>>,
#[allow(dead_code)]
max_buffers: usize,
#[allow(dead_code)]
buffer_size: usize,
}
pub trait RdfXmlStreamingSink: Send + Sync {
type Error: Send + Sync + std::error::Error;
fn process_triple_stream(
&mut self,
triples: Vec<Triple>,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn process_namespace_declaration(
&mut self,
prefix: &str,
namespace: &str,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn flush_output(&mut self)
-> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn get_statistics(&self) -> RdfXmlSinkStatistics;
}
#[derive(Debug, Clone)]
pub struct RdfXmlSinkStatistics {
pub triples_processed: usize,
pub namespaces_declared: usize,
pub processing_rate_tps: f64, pub memory_usage_bytes: usize,
pub compression_ratio: f64,
}
impl Default for RdfXmlStreamingConfig {
fn default() -> Self {
Self {
xml_buffer_size: 64 * 1024, max_namespace_depth: 100,
max_element_depth: 1000,
enable_zero_copy: true,
enable_parallel_processing: true,
triple_batch_size: 1000,
arena_size: 1024 * 1024, memory_pressure_threshold: 512 * 1024 * 1024, }
}
}
impl DomFreeStreamingRdfXmlParser {
pub fn new(config: RdfXmlStreamingConfig) -> Self {
let buffer_size = config.xml_buffer_size;
Self {
namespace_stack: vec![NamespaceContext::default()],
element_stack: Vec::with_capacity(config.max_element_depth),
term_interner: Arc::new(StringInterner::with_capacity(100_000)),
performance_monitor: Arc::new(RdfXmlPerformanceMonitor::new()),
arena: Bump::with_capacity(config.arena_size),
buffer_pool: Arc::new(RdfXmlBufferPool::new(buffer_size, 50)),
simd_processor: SimdXmlProcessor::new(),
zero_copy_buffer: ZeroCopyBuffer::new(buffer_size),
config,
}
}
#[allow(dead_code)]
pub fn simd_processor(&self) -> &SimdXmlProcessor {
&self.simd_processor
}
pub async fn stream_parse<R, S>(
&mut self,
reader: R,
mut sink: S,
) -> Result<RdfXmlStreamingStatistics, RdfXmlParseError>
where
R: std::io::Read,
S: RdfXmlStreamingSink + 'static,
{
let mut buf_reader = BufReader::new(reader);
let mut xml_reader = XmlReader::from_reader(&mut buf_reader);
xml_reader.config_mut().trim_text(true);
let mut triple_buffer = Vec::with_capacity(self.config.triple_batch_size);
let (tx, mut rx) = mpsc::channel::<TripleBatch>(100);
let _sink_tx = tx.clone();
let sink_handle = tokio::spawn(async move {
while let Some(batch) = rx.recv().await {
sink.process_triple_stream(batch.triples).await?;
}
sink.flush_output().await?;
Ok::<(), S::Error>(())
});
let mut buf = Vec::new();
loop {
match xml_reader.read_event_into(&mut buf) {
Ok(Event::Start(ref e)) => {
self.handle_start_element(e, &mut triple_buffer, &tx)
.await?;
}
Ok(Event::End(ref e)) => {
self.handle_end_element(e, &mut triple_buffer, &tx).await?;
}
Ok(Event::Text(ref e)) => {
self.handle_text_content(e, &mut triple_buffer, &tx).await?;
}
Ok(Event::Empty(ref e)) => {
self.handle_empty_element(e, &mut triple_buffer, &tx)
.await?;
}
Ok(Event::Eof) => break,
Ok(_) => {} Err(e) => return Err(RdfXmlParseError::XmlError(e.to_string())),
}
if self.should_cleanup_memory() {
self.cleanup_memory().await;
}
buf.clear();
}
if !triple_buffer.is_empty() {
let batch = TripleBatch {
triples: triple_buffer,
};
tx.send(batch)
.await
.map_err(|_| RdfXmlParseError::XmlError("Channel send failed".to_string()))?;
}
drop(tx);
sink_handle
.await
.map_err(|e| RdfXmlParseError::XmlError(format!("Sink task failed: {e}")))?
.map_err(|e| RdfXmlParseError::XmlError(format!("Sink error: {e}")))?;
Ok(self.performance_monitor.get_statistics())
}
async fn handle_start_element(
&mut self,
element: &BytesStart<'_>,
triple_buffer: &mut Vec<Triple>,
tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
self.performance_monitor.record_element_processed();
let element_name = self.resolve_qname(element.name().as_ref())?;
let mut context = ElementContext::new();
context.element_type = self.classify_element(&element_name)?;
self.process_attributes_zero_copy(element.attributes(), &mut context)
.await?;
match context.element_type {
ElementType::RdfRoot => {
self.handle_rdf_root(&context).await?;
}
ElementType::Description => {
self.handle_description_element(&mut context, triple_buffer, tx)
.await?;
}
ElementType::Property => {
self.handle_property_element(&mut context, triple_buffer, tx)
.await?;
}
ElementType::Collection => {
self.handle_collection_element(&mut context, triple_buffer, tx)
.await?;
}
ElementType::ParseType(parse_type) => {
self.handle_parse_type_element(parse_type, &mut context, triple_buffer, tx)
.await?;
}
ElementType::Unknown => {
context.element_type = ElementType::Property;
self.handle_property_element(&mut context, triple_buffer, tx)
.await?;
}
}
self.element_stack.push(context);
Ok(())
}
async fn handle_end_element(
&mut self,
_element: &BytesEnd<'_>,
triple_buffer: &mut Vec<Triple>,
tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
if let Some(context) = self.element_stack.pop() {
self.finalize_element_processing(context, triple_buffer, tx)
.await?;
}
if triple_buffer.len() >= self.config.triple_batch_size {
let batch = TripleBatch {
triples: std::mem::take(triple_buffer),
};
tx.send(batch)
.await
.map_err(|_| RdfXmlParseError::XmlError("Channel send failed".to_string()))?;
}
Ok(())
}
async fn handle_text_content(
&mut self,
text: &BytesText<'_>,
triple_buffer: &mut Vec<Triple>,
_tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
let text_content = if self.config.enable_zero_copy {
self.process_text_zero_copy(text)?
} else {
unescape(
std::str::from_utf8(text).map_err(|e| RdfXmlParseError::XmlError(e.to_string()))?,
)
.map_err(|e| RdfXmlParseError::XmlError(e.to_string()))?
.into_owned()
};
if let Some(context) = self.element_stack.last_mut() {
if context.element_type == ElementType::Property {
let literal = if let Some(datatype) = &context.datatype {
self.term_interner
.intern_literal_with_datatype(&text_content, &datatype.to_string())?
} else if let Some(language) = &context.language {
self.term_interner
.intern_literal_with_language(&text_content, language)?
} else {
self.term_interner.intern_literal(&text_content)?
};
context.object = Some(literal.clone().into());
if let (Some(subject), Some(predicate)) = (&context.subject, &context.predicate) {
if let Ok(subj) = Subject::try_from(subject.clone()) {
let triple = Triple::new(subj, predicate.clone(), Object::from(literal));
triple_buffer.push(triple);
}
}
}
}
Ok(())
}
async fn handle_empty_element(
&mut self,
element: &BytesStart<'_>,
triple_buffer: &mut Vec<Triple>,
tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
self.handle_start_element(element, triple_buffer, tx)
.await?;
if let Some(context) = self.element_stack.pop() {
self.finalize_element_processing(context, triple_buffer, tx)
.await?;
}
Ok(())
}
async fn process_attributes_zero_copy(
&mut self,
attributes: Attributes<'_>,
context: &mut ElementContext,
) -> Result<(), RdfXmlParseError> {
for attr_result in attributes {
let attr = attr_result.map_err(|e| RdfXmlParseError::XmlError(e.to_string()))?;
let attr_name = if self.config.enable_zero_copy {
self.process_attribute_name_zero_copy(attr.key.as_ref())?
} else {
String::from_utf8_lossy(attr.key.as_ref()).into_owned()
};
let attr_value = if self.config.enable_zero_copy {
self.process_attribute_value_zero_copy(&attr_name, &attr.value)?
} else {
attr.unescape_value()
.map_err(|e| RdfXmlParseError::XmlError(e.to_string()))?
.into_owned()
};
match attr_name.as_str() {
"rdf:about" | "about" => {
let iri = self.resolve_uri(&attr_value)?;
context.subject = Some(self.term_interner.intern_named_node(&iri)?.into());
}
"rdf:resource" | "resource" => {
let iri = self.resolve_uri(&attr_value)?;
context.object = Some(self.term_interner.intern_named_node(&iri)?.into());
}
"rdf:nodeID" | "nodeID" => {
context.subject = Some(BlankNode::new(&attr_value)?.into());
}
"rdf:datatype" | "datatype" => {
let iri = self.resolve_uri(&attr_value)?;
context.datatype = Some(self.term_interner.intern_named_node(&iri)?);
}
"xml:lang" | "lang" => {
context.language = Some(attr_value);
}
"rdf:parseType" | "parseType" => {
context.element_type =
ElementType::ParseType(self.parse_parse_type(&attr_value)?);
}
_ => {
if let Some(prefix) = attr_name.strip_prefix("xmlns:") {
self.declare_namespace(prefix, &attr_value)?;
} else if attr_name == "xmlns" {
self.set_default_namespace(&attr_value)?;
} else {
context.attributes.insert(attr_name, attr_value);
}
}
}
}
Ok(())
}
fn classify_element(&self, element_name: &str) -> Result<ElementType, RdfXmlParseError> {
match element_name {
"rdf:RDF" | "RDF" => Ok(ElementType::RdfRoot),
"rdf:Description" | "Description" => Ok(ElementType::Description),
"rdf:Bag" | "rdf:Seq" | "rdf:Alt" | "Bag" | "Seq" | "Alt" => {
Ok(ElementType::Collection)
}
_ => {
if self.is_rdf_property(element_name) {
Ok(ElementType::Property)
} else if self.is_rdf_type(element_name) {
Ok(ElementType::Description)
} else {
Ok(ElementType::Unknown)
}
}
}
}
async fn handle_rdf_root(&mut self, context: &ElementContext) -> Result<(), RdfXmlParseError> {
for (name, value) in &context.attributes {
if let Some(prefix) = name.strip_prefix("xmlns:") {
self.declare_namespace(prefix, value)?;
} else if name == "xmlns" {
self.set_default_namespace(value)?;
} else if name.starts_with("xml:base") {
self.set_base_uri(value)?;
}
}
Ok(())
}
async fn handle_description_element(
&mut self,
context: &mut ElementContext,
triple_buffer: &mut Vec<Triple>,
_tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
if context.subject.is_none() {
context.subject = Some(self.term_interner.intern_blank_node().into());
}
for (name, value) in &context.attributes.clone() {
if !name.starts_with("rdf:") && !name.starts_with("xml:") {
let predicate_iri = self.resolve_qname(name.as_bytes())?;
let predicate = self.term_interner.intern_named_node(&predicate_iri)?;
let object: Term = if self.is_uri_reference(value) {
let iri = self.resolve_uri(value)?;
self.term_interner.intern_named_node(&iri)?.into()
} else {
self.term_interner.intern_literal(value)?.into()
};
if let Some(subject) = &context.subject {
if let (Ok(subj), obj) =
(Subject::try_from(subject.clone()), Object::from(object))
{
let triple = Triple::new(subj, predicate, obj);
triple_buffer.push(triple);
}
}
}
}
Ok(())
}
async fn handle_property_element(
&mut self,
context: &mut ElementContext,
_triple_buffer: &mut [Triple],
_tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
if let Some(parent_context) = self.element_stack.iter().rev().find(|ctx| {
matches!(
ctx.element_type,
ElementType::Description | ElementType::Property
)
}) {
context.subject = parent_context.subject.clone();
}
if let Some(element_name) = self.get_current_element_name() {
let predicate_iri = self.resolve_qname(element_name.as_bytes())?;
context.predicate = Some(self.term_interner.intern_named_node(&predicate_iri)?);
}
Ok(())
}
async fn handle_collection_element(
&mut self,
context: &mut ElementContext,
triple_buffer: &mut Vec<Triple>,
_tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
if context.subject.is_none() {
context.subject = Some(self.term_interner.intern_blank_node().into());
}
if let (Some(subject), Some(collection_type)) = (
&context.subject,
self.get_collection_type(&context.element_type),
) {
let rdf_type = self
.term_interner
.intern_named_node("http://www.w3.org/1999/02/22-rdf-syntax-ns#type")?;
let type_object: Object = self
.term_interner
.intern_named_node(collection_type)?
.into();
if let Ok(subj) = Subject::try_from(subject.clone()) {
let triple = Triple::new(subj, rdf_type, type_object);
triple_buffer.push(triple);
}
}
Ok(())
}
async fn handle_parse_type_element(
&mut self,
parse_type: ParseType,
context: &mut ElementContext,
_triple_buffer: &mut [Triple],
_tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
match parse_type {
ParseType::Resource => {
context.object = Some(self.term_interner.intern_blank_node().into());
}
ParseType::Collection => {
context.object = Some(self.term_interner.intern_blank_node().into());
}
ParseType::Literal => {
context.datatype =
Some(self.term_interner.intern_named_node(
"http://www.w3.org/1999/02/22-rdf-syntax-ns#XMLLiteral",
)?);
}
}
Ok(())
}
async fn finalize_element_processing(
&mut self,
context: ElementContext,
triple_buffer: &mut Vec<Triple>,
_tx: &mpsc::Sender<TripleBatch>,
) -> Result<(), RdfXmlParseError> {
if let (Some(subject), Some(predicate), Some(object)) =
(context.subject, context.predicate, context.object)
{
if let (Ok(subj), obj) = (Subject::try_from(subject), Object::from(object)) {
let triple = Triple::new(subj, predicate, obj);
triple_buffer.push(triple);
}
}
Ok(())
}
fn process_text_zero_copy(&self, text: &BytesText<'_>) -> Result<String, RdfXmlParseError> {
let raw_bytes = text.as_ref();
if !self.simd_processor.is_valid_utf8(raw_bytes) {
return Err(RdfXmlParseError::XmlError(
"Invalid UTF-8 in text content".to_string(),
));
}
let trimmed = self.simd_processor.trim_whitespace(raw_bytes);
if self.config.enable_zero_copy {
let text_str = unsafe { std::str::from_utf8_unchecked(trimmed) };
let allocated = self.arena.alloc_str(text_str);
Ok(allocated.to_string())
} else {
let text_str = unsafe { std::str::from_utf8_unchecked(trimmed) };
Ok(unescape(text_str)
.map_err(|e| RdfXmlParseError::XmlError(e.to_string()))?
.into_owned())
}
}
fn process_attribute_name_zero_copy(&self, name: &[u8]) -> Result<String, RdfXmlParseError> {
self.performance_monitor.record_zero_copy_operation();
if !self.simd_processor.is_valid_utf8(name) {
return Err(RdfXmlParseError::XmlError(
"Invalid UTF-8 in attribute name".to_string(),
));
}
let name_str = unsafe { std::str::from_utf8_unchecked(name) };
Ok(name_str.to_string())
}
fn process_attribute_value_zero_copy(
&self,
_name: &str,
value: &[u8],
) -> Result<String, RdfXmlParseError> {
self.performance_monitor.record_zero_copy_operation();
if !self.simd_processor.is_valid_utf8(value) {
return Err(RdfXmlParseError::XmlError(
"Invalid UTF-8 in attribute value".to_string(),
));
}
let trimmed = self.simd_processor.trim_whitespace(value);
let value_str = unsafe { std::str::from_utf8_unchecked(trimmed) };
Ok(value_str.to_string())
}
fn resolve_qname(&self, qname: &[u8]) -> Result<String, RdfXmlParseError> {
let (prefix_bytes, local_bytes) = self.simd_processor.parse_qname(qname);
if !prefix_bytes.is_empty() {
let prefix = String::from_utf8_lossy(prefix_bytes);
let local_name = String::from_utf8_lossy(local_bytes);
if let Some(namespace_uri) = self.get_namespace_uri(&prefix) {
Ok(format!("{namespace_uri}{local_name}"))
} else {
Err(RdfXmlParseError::UndefinedPrefix(prefix.to_string()))
}
} else {
let qname_str = String::from_utf8_lossy(qname);
if let Some(default_ns) = self.get_default_namespace() {
Ok(format!("{default_ns}{qname_str}"))
} else {
Ok(qname_str.into_owned())
}
}
}
fn resolve_uri(&self, uri: &str) -> Result<String, RdfXmlParseError> {
if uri.starts_with("http://") || uri.starts_with("https://") {
Ok(uri.to_string())
} else if let Some(base_uri) = self.get_base_uri() {
Ok(format!("{base_uri}{uri}"))
} else {
Ok(uri.to_string())
}
}
fn is_rdf_property(&self, name: &str) -> bool {
matches!(
name,
"rdf:type"
| "type"
| "rdf:value"
| "value"
| "rdf:first"
| "first"
| "rdf:rest"
| "rest"
)
}
fn is_rdf_type(&self, name: &str) -> bool {
name.contains("Class")
|| name.contains("Property")
|| name == "rdf:Resource"
|| name == "Resource"
}
fn is_uri_reference(&self, value: &str) -> bool {
value.starts_with("http://")
|| value.starts_with("https://")
|| value.starts_with("#")
|| value.starts_with("../")
|| value.starts_with("./")
}
fn parse_parse_type(&self, parse_type: &str) -> Result<ParseType, RdfXmlParseError> {
match parse_type {
"Resource" => Ok(ParseType::Resource),
"Collection" => Ok(ParseType::Collection),
"Literal" => Ok(ParseType::Literal),
_ => Err(RdfXmlParseError::InvalidParseType(parse_type.to_string())),
}
}
fn get_collection_type(&self, element_type: &ElementType) -> Option<&'static str> {
match element_type {
ElementType::Collection => Some("http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag"),
_ => None,
}
}
fn get_namespace_uri(&self, prefix: &str) -> Option<String> {
self.namespace_stack.last()?.prefixes.get(prefix).cloned()
}
fn get_default_namespace(&self) -> Option<String> {
self.namespace_stack.last()?.default_namespace.clone()
}
fn get_base_uri(&self) -> Option<String> {
self.namespace_stack.last()?.base_uri.clone()
}
fn declare_namespace(&mut self, prefix: &str, namespace: &str) -> Result<(), RdfXmlParseError> {
if let Some(context) = self.namespace_stack.last_mut() {
context
.prefixes
.insert(prefix.to_string(), namespace.to_string());
}
Ok(())
}
fn set_default_namespace(&mut self, namespace: &str) -> Result<(), RdfXmlParseError> {
if let Some(context) = self.namespace_stack.last_mut() {
context.default_namespace = Some(namespace.to_string());
}
Ok(())
}
fn set_base_uri(&mut self, base_uri: &str) -> Result<(), RdfXmlParseError> {
if let Some(context) = self.namespace_stack.last_mut() {
context.base_uri = Some(base_uri.to_string());
}
Ok(())
}
fn get_current_element_name(&self) -> Option<String> {
None }
fn should_cleanup_memory(&self) -> bool {
self.arena.allocated_bytes() > self.config.memory_pressure_threshold
}
async fn cleanup_memory(&mut self) {
self.arena.reset();
self.performance_monitor.record_memory_cleanup();
}
}
#[derive(Debug)]
struct TripleBatch {
triples: Vec<Triple>,
}
#[derive(Debug, Clone)]
pub struct RdfXmlStreamingStatistics {
pub elements_processed: usize,
pub triples_generated: usize,
pub namespace_lookups: usize,
pub zero_copy_operations: usize,
pub parse_errors: usize,
pub processing_time: Duration,
pub memory_usage_bytes: usize,
pub throughput_elements_per_second: f64,
}
impl Default for NamespaceContext {
fn default() -> Self {
let mut prefixes = HashMap::new();
prefixes.insert(
"rdf".to_string(),
"http://www.w3.org/1999/02/22-rdf-syntax-ns#".to_string(),
);
prefixes.insert(
"rdfs".to_string(),
"http://www.w3.org/2000/01/rdf-schema#".to_string(),
);
prefixes.insert(
"xsd".to_string(),
"http://www.w3.org/2001/XMLSchema#".to_string(),
);
Self {
prefixes,
default_namespace: None,
base_uri: None,
}
}
}
impl ElementContext {
fn new() -> Self {
Self {
element_type: ElementType::Unknown,
subject: None,
predicate: None,
object: None,
datatype: None,
language: None,
attributes: HashMap::new(),
}
}
}
impl RdfXmlPerformanceMonitor {
fn new() -> Self {
Self {
elements_processed: AtomicUsize::new(0),
triples_generated: AtomicUsize::new(0),
namespace_lookups: AtomicUsize::new(0),
zero_copy_operations: AtomicUsize::new(0),
memory_allocations: AtomicUsize::new(0),
parse_errors: AtomicUsize::new(0),
start_time: Instant::now(),
processing_times: Arc::new(ParkingLotMutex::new(VecDeque::with_capacity(1000))),
}
}
fn record_element_processed(&self) {
self.elements_processed.fetch_add(1, Ordering::Relaxed);
}
#[allow(dead_code)]
fn record_triples_generated(&self, count: usize) {
self.triples_generated.fetch_add(count, Ordering::Relaxed);
}
fn record_zero_copy_operation(&self) {
self.zero_copy_operations.fetch_add(1, Ordering::Relaxed);
}
fn record_memory_cleanup(&self) {
}
fn get_statistics(&self) -> RdfXmlStreamingStatistics {
let elapsed = self.start_time.elapsed();
let elements = self.elements_processed.load(Ordering::Relaxed);
let triples = self.triples_generated.load(Ordering::Relaxed);
let namespace_lookups = self.namespace_lookups.load(Ordering::Relaxed);
let zero_copy_ops = self.zero_copy_operations.load(Ordering::Relaxed);
let errors = self.parse_errors.load(Ordering::Relaxed);
let throughput = if elapsed.as_secs() > 0 {
elements as f64 / elapsed.as_secs_f64()
} else {
0.0
};
RdfXmlStreamingStatistics {
elements_processed: elements,
triples_generated: triples,
namespace_lookups,
zero_copy_operations: zero_copy_ops,
parse_errors: errors,
processing_time: elapsed,
memory_usage_bytes: 0, throughput_elements_per_second: throughput,
}
}
}
impl RdfXmlBufferPool {
fn new(buffer_size: usize, max_buffers: usize) -> Self {
Self {
xml_buffers: Arc::new(Mutex::new(Vec::with_capacity(max_buffers))),
string_buffers: Arc::new(Mutex::new(Vec::with_capacity(max_buffers))),
max_buffers,
buffer_size,
}
}
#[allow(dead_code)]
fn get_xml_buffer(&self) -> Vec<u8> {
let mut buffers = self
.xml_buffers
.lock()
.expect("lock should not be poisoned");
buffers
.pop()
.unwrap_or_else(|| Vec::with_capacity(self.buffer_size))
}
#[allow(dead_code)]
fn return_xml_buffer(&self, mut buffer: Vec<u8>) {
buffer.clear();
let mut buffers = self
.xml_buffers
.lock()
.expect("lock should not be poisoned");
if buffers.len() < self.max_buffers {
buffers.push(buffer);
}
}
}
pub struct MemoryRdfXmlSink {
triples: Arc<Mutex<Vec<Triple>>>,
namespaces: Arc<Mutex<HashMap<String, String>>>,
statistics: Arc<Mutex<RdfXmlSinkStatistics>>,
}
impl Default for MemoryRdfXmlSink {
fn default() -> Self {
Self::new()
}
}
impl MemoryRdfXmlSink {
pub fn new() -> Self {
Self {
triples: Arc::new(Mutex::new(Vec::new())),
namespaces: Arc::new(Mutex::new(HashMap::new())),
statistics: Arc::new(Mutex::new(RdfXmlSinkStatistics {
triples_processed: 0,
namespaces_declared: 0,
processing_rate_tps: 0.0,
memory_usage_bytes: 0,
compression_ratio: 1.0,
})),
}
}
pub fn get_triples(&self) -> Vec<Triple> {
self.triples
.lock()
.expect("lock should not be poisoned")
.clone()
}
pub fn get_namespaces(&self) -> HashMap<String, String> {
self.namespaces
.lock()
.expect("lock should not be poisoned")
.clone()
}
}
#[derive(Debug)]
pub struct StreamingSinkError(String);
impl std::fmt::Display for StreamingSinkError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Streaming sink error: {}", self.0)
}
}
impl std::error::Error for StreamingSinkError {}
impl StreamingSinkError {
pub fn new(message: String) -> Self {
Self(message)
}
}
impl RdfXmlStreamingSink for MemoryRdfXmlSink {
type Error = StreamingSinkError;
async fn process_triple_stream(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error> {
let count = triples.len();
{
let mut triple_vec = self.triples.lock().expect("lock should not be poisoned");
triple_vec.extend(triples);
}
{
let mut stats = self.statistics.lock().expect("lock should not be poisoned");
stats.triples_processed += count;
}
Ok(())
}
fn process_namespace_declaration(
&mut self,
prefix: &str,
namespace: &str,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send {
let prefix = prefix.to_string();
let namespace = namespace.to_string();
async move {
{
let mut namespaces = self.namespaces.lock().expect("lock should not be poisoned");
namespaces.insert(prefix, namespace);
}
{
let mut stats = self.statistics.lock().expect("lock should not be poisoned");
stats.namespaces_declared += 1;
}
Ok(())
}
}
async fn flush_output(&mut self) -> Result<(), Self::Error> {
Ok(())
}
fn get_statistics(&self) -> RdfXmlSinkStatistics {
self.statistics
.lock()
.expect("lock should not be poisoned")
.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore] async fn test_dom_free_streaming_parser() {
use std::io::Cursor;
let rdfxml_data = r#"<?xml version="1.0"?>
<rdf:RDF xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:foaf="http://xmlns.com/foaf/0.1/">
<foaf:Person rdf:about="http://example.org/person/alice">
<foaf:name>Alice</foaf:name>
<foaf:age>30</foaf:age>
</foaf:Person>
</rdf:RDF>"#;
let config = RdfXmlStreamingConfig::default();
let mut parser = DomFreeStreamingRdfXmlParser::new(config);
let reader = Cursor::new(rdfxml_data.as_bytes());
let sink = MemoryRdfXmlSink::new();
let stats = parser
.stream_parse(reader, sink)
.await
.expect("async operation should succeed");
assert!(stats.elements_processed > 0);
assert!(stats.processing_time.as_nanos() > 0);
}
}