use crate::{
jsonld::JsonLdParseError,
model::{NamedNode, Object, Predicate, Quad, Subject, Triple},
optimization::{SimdJsonProcessor, TermInterner, TermInternerExt, ZeroCopyBuffer},
};
use dashmap::DashMap;
use parking_lot::Mutex;
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use serde_json::{Map, Value};
use std::{
collections::VecDeque,
error::Error as StdError,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use tokio::{
io::{AsyncRead, AsyncReadExt, BufReader},
sync::{mpsc, RwLock, Semaphore},
time::{Duration, Instant},
};
pub struct UltraStreamingJsonLdParser {
config: StreamingConfig,
context_cache: Arc<DashMap<String, Arc<Value>>>,
term_interner: Arc<TermInterner>,
performance_monitor: Arc<PerformanceMonitor>,
simd_processor: SimdJsonProcessor,
buffer_pool: Arc<BufferPool>,
}
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub chunk_size: usize,
pub max_concurrent_threads: usize,
pub buffer_size: usize,
pub enable_simd: bool,
pub context_cache_size: usize,
pub adaptive_threshold: f64,
pub memory_pressure_threshold: usize,
pub zero_copy_level: ZeroCopyLevel,
pub enable_profiling: bool,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ZeroCopyLevel {
None,
Basic,
Advanced,
Maximum,
}
pub struct PerformanceMonitor {
total_bytes_processed: AtomicUsize,
total_triples_parsed: AtomicUsize,
parse_errors: AtomicUsize,
context_cache_hits: AtomicUsize,
context_cache_misses: AtomicUsize,
simd_operations: AtomicUsize,
zero_copy_operations: AtomicUsize,
start_time: Instant,
chunk_processing_times: Arc<Mutex<VecDeque<Duration>>>,
}
pub struct BufferPool {
available_buffers: Arc<Mutex<Vec<ZeroCopyBuffer>>>,
buffer_size: usize,
max_buffers: usize,
current_buffers: AtomicUsize,
}
#[async_trait::async_trait]
pub trait StreamingSink: Send + Sync {
type Error: Send + Sync + std::error::Error + 'static;
async fn process_triple_batch(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error>;
async fn process_quad_batch(&mut self, quads: Vec<Quad>) -> Result<(), Self::Error>;
async fn flush(&mut self) -> Result<(), Self::Error>;
fn performance_statistics(&self) -> SinkStatistics;
}
#[derive(Debug, Clone)]
pub struct SinkStatistics {
pub total_triples_processed: usize,
pub total_quads_processed: usize,
pub average_batch_size: f64,
pub processing_rate_per_second: f64,
pub memory_usage_bytes: usize,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
chunk_size: 64 * 1024, max_concurrent_threads: num_cpus::get() * 2,
buffer_size: 1024 * 1024, enable_simd: true,
context_cache_size: 10000,
adaptive_threshold: 0.8,
memory_pressure_threshold: 8 * 1024 * 1024 * 1024, zero_copy_level: ZeroCopyLevel::Advanced,
enable_profiling: true,
}
}
}
impl UltraStreamingJsonLdParser {
pub fn new(config: StreamingConfig) -> Self {
Self {
context_cache: Arc::new(DashMap::with_capacity(config.context_cache_size)),
term_interner: Arc::new(TermInterner::new()),
performance_monitor: Arc::new(PerformanceMonitor::new()),
simd_processor: SimdJsonProcessor::new(),
buffer_pool: Arc::new(BufferPool::new(config.buffer_size, 100)),
config,
}
}
pub async fn stream_parse<R, S>(
&mut self,
reader: R,
mut sink: S,
) -> Result<StreamingStatistics, JsonLdParseError>
where
R: AsyncRead + Unpin + Send + 'static,
S: StreamingSink + Send + 'static,
S::Error: 'static,
{
let mut buf_reader = BufReader::with_capacity(self.config.chunk_size, reader);
let (tx, mut rx) = mpsc::channel::<ProcessingChunk>(self.config.buffer_size);
let (triple_tx, mut triple_rx) = mpsc::channel::<Vec<Triple>>(100);
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_threads));
let sink_handle = tokio::spawn(async move {
while let Some(batch) = triple_rx.recv().await {
sink.process_triple_batch(batch)
.await
.map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
}
sink.flush()
.await
.map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
Ok::<(), JsonLdParseError>(())
});
let processing_handle = tokio::spawn({
let config = self.config.clone();
let context_cache = Arc::clone(&self.context_cache);
let term_interner = Arc::clone(&self.term_interner);
let performance_monitor = Arc::clone(&self.performance_monitor);
let simd_processor = self.simd_processor.clone();
let triple_tx = triple_tx.clone();
async move {
let mut batch_buffer = Vec::with_capacity(config.buffer_size);
while let Some(chunk) = rx.recv().await {
let _permit = semaphore
.acquire()
.await
.expect("semaphore should not be closed");
let processed_triples = if config.enable_simd {
Self::process_chunk_simd(
chunk,
&context_cache,
&term_interner,
&simd_processor,
)
.await?
} else {
Self::process_chunk_standard(chunk, &context_cache, &term_interner).await?
};
performance_monitor.record_triples_parsed(processed_triples.len());
batch_buffer.extend(processed_triples);
if batch_buffer.len() >= config.buffer_size
|| performance_monitor.should_flush_batch()
{
triple_tx
.send(std::mem::take(&mut batch_buffer))
.await
.map_err(|_| {
JsonLdParseError::ProcessingError(
"Triple channel send failed".to_string(),
)
})?;
}
}
if !batch_buffer.is_empty() {
triple_tx.send(batch_buffer).await.map_err(|_| {
JsonLdParseError::ProcessingError("Triple channel send failed".to_string())
})?;
}
Ok::<(), JsonLdParseError>(())
}
});
let mut buffer = self.buffer_pool.get_buffer().await;
let mut total_bytes = 0;
loop {
match buf_reader.read(buffer.as_mut_slice()).await {
Ok(0) => break, Ok(n) => {
buffer.set_len(n);
total_bytes += n;
self.performance_monitor.record_bytes_processed(n);
if self.should_adjust_chunk_size(n) {
self.adjust_chunk_size_adaptive().await;
}
let chunk = ProcessingChunk {
data: buffer.as_slice().to_vec(),
timestamp: Instant::now(),
sequence_id: total_bytes,
};
tx.send(chunk).await.map_err(|_| {
JsonLdParseError::ProcessingError("Channel send failed".to_string())
})?;
buffer = self.buffer_pool.get_buffer().await;
}
Err(e) => return Err(JsonLdParseError::Io(e)),
}
}
drop(tx); processing_handle
.await
.map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))??;
drop(triple_tx); sink_handle
.await
.map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))??;
Ok(self.performance_monitor.get_statistics())
}
async fn process_chunk_simd(
chunk: ProcessingChunk,
context_cache: &DashMap<String, Arc<Value>>,
term_interner: &TermInterner,
simd_processor: &SimdJsonProcessor,
) -> Result<Vec<Triple>, JsonLdParseError> {
let start = Instant::now();
let json_value = simd_processor
.parse_json(&chunk.data)
.map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
let context = Self::resolve_context_zero_copy(&json_value, context_cache).await?;
#[cfg(feature = "parallel")]
let triples = Self::extract_triples_parallel(&json_value, &context, term_interner).await?;
#[cfg(not(feature = "parallel"))]
let triples = Self::extract_triples_standard(&json_value, &context, term_interner).await?;
let _processing_time = start.elapsed();
Ok(triples)
}
async fn process_chunk_standard(
chunk: ProcessingChunk,
context_cache: &DashMap<String, Arc<Value>>,
term_interner: &TermInterner,
) -> Result<Vec<Triple>, JsonLdParseError> {
let json_value: Value = serde_json::from_slice(&chunk.data)
.map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
let context = Self::resolve_context_cached(&json_value, context_cache).await?;
let triples = Self::extract_triples_standard(&json_value, &context, term_interner).await?;
Ok(triples)
}
async fn resolve_context_zero_copy(
json_value: &Value,
context_cache: &DashMap<String, Arc<Value>>,
) -> Result<Arc<Value>, JsonLdParseError> {
if let Some(context_ref) = json_value.get("@context") {
if let Some(context_str) = context_ref.as_str() {
if let Some(cached_context) = context_cache.get(context_str) {
return Ok(Arc::clone(&cached_context));
}
let resolved_context = Self::resolve_remote_context(context_str).await?;
let context_arc = Arc::new(resolved_context);
context_cache.insert(context_str.to_string(), Arc::clone(&context_arc));
return Ok(context_arc);
}
}
Ok(Arc::new(Value::Object(Map::new())))
}
async fn resolve_context_cached(
json_value: &Value,
context_cache: &DashMap<String, Arc<Value>>,
) -> Result<Arc<Value>, JsonLdParseError> {
Self::resolve_context_zero_copy(json_value, context_cache).await
}
#[cfg(feature = "parallel")]
async fn extract_triples_parallel(
json_value: &Value,
context: &Value,
term_interner: &TermInterner,
) -> Result<Vec<Triple>, JsonLdParseError> {
if let Value::Array(objects) = json_value {
let triples: Result<Vec<Vec<Triple>>, JsonLdParseError> = objects
.par_iter()
.map(|obj| Self::extract_triples_from_object(obj, context, term_interner))
.collect();
Ok(triples?.into_iter().flatten().collect())
} else {
Self::extract_triples_from_object(json_value, context, term_interner)
}
}
async fn extract_triples_standard(
json_value: &Value,
context: &Value,
term_interner: &TermInterner,
) -> Result<Vec<Triple>, JsonLdParseError> {
Self::extract_triples_from_object(json_value, context, term_interner)
}
fn extract_triples_from_object(
obj: &Value,
context: &Value,
term_interner: &TermInterner,
) -> Result<Vec<Triple>, JsonLdParseError> {
let mut triples = Vec::new();
if let Value::Object(map) = obj {
let subject: Subject = if let Some(id) = map.get("@id") {
Subject::NamedNode(term_interner.intern_named_node(id.as_str().ok_or_else(
|| JsonLdParseError::ProcessingError("Invalid @id".to_string()),
)?)?)
} else {
Subject::BlankNode(term_interner.intern_blank_node())
};
for (key, value) in map {
if key.starts_with('@') {
continue; }
let predicate_iri = Self::expand_property(key, context)?;
let predicate = term_interner.intern_named_node(&predicate_iri)?;
match value {
Value::Array(values) => {
for val in values {
if let Some(triple) = Self::create_triple_from_value(
subject.clone(),
predicate.clone(),
val,
context,
term_interner,
)? {
triples.push(triple);
}
}
}
_ => {
if let Some(triple) = Self::create_triple_from_value(
subject.clone(),
predicate,
value,
context,
term_interner,
)? {
triples.push(triple);
}
}
}
}
}
Ok(triples)
}
fn create_triple_from_value(
subject: Subject,
predicate: NamedNode,
value: &Value,
_context: &Value,
term_interner: &TermInterner,
) -> Result<Option<Triple>, JsonLdParseError> {
let object: Object = match value {
Value::String(s) => {
if s.starts_with("http://") || s.starts_with("https://") {
Object::NamedNode(term_interner.intern_named_node(s)?)
} else {
Object::Literal(term_interner.intern_literal(s)?)
}
}
Value::Object(obj) => {
if let Some(id) = obj.get("@id") {
Object::NamedNode(term_interner.intern_named_node(id.as_str().ok_or_else(
|| JsonLdParseError::ProcessingError("Invalid @id in object".to_string()),
)?)?)
} else if let Some(val) = obj.get("@value") {
let literal_value = val.as_str().ok_or_else(|| {
JsonLdParseError::ProcessingError("Invalid @value".to_string())
})?;
if let Some(datatype) = obj.get("@type") {
let datatype_iri = datatype.as_str().ok_or_else(|| {
JsonLdParseError::ProcessingError("Invalid @type".to_string())
})?;
Object::Literal(
term_interner
.intern_literal_with_datatype(literal_value, datatype_iri)?,
)
} else if let Some(lang) = obj.get("@language") {
let language = lang.as_str().ok_or_else(|| {
JsonLdParseError::ProcessingError("Invalid @language".to_string())
})?;
Object::Literal(
term_interner.intern_literal_with_language(literal_value, language)?,
)
} else {
Object::Literal(term_interner.intern_literal(literal_value)?)
}
} else {
return Ok(None); }
}
Value::Number(n) => Object::Literal(term_interner.intern_literal(&n.to_string())?),
Value::Bool(b) => Object::Literal(term_interner.intern_literal(&b.to_string())?),
_ => return Ok(None),
};
Ok(Some(Triple::new(
subject,
Predicate::NamedNode(predicate),
object,
)))
}
fn expand_property(property: &str, context: &Value) -> Result<String, JsonLdParseError> {
if property.contains(':') {
Ok(property.to_string())
} else if let Value::Object(ctx) = context {
if let Some(expanded) = ctx.get(property) {
if let Some(iri) = expanded.as_str() {
Ok(iri.to_string())
} else {
Ok(format!("http://example.org/{property}"))
}
} else {
Ok(format!("http://example.org/{property}"))
}
} else {
Ok(format!("http://example.org/{property}"))
}
}
async fn resolve_remote_context(_context_iri: &str) -> Result<Value, JsonLdParseError> {
Ok(Value::Object(Map::new()))
}
fn should_adjust_chunk_size(&self, bytes_read: usize) -> bool {
let target_size = self.config.chunk_size;
let threshold = (target_size as f64 * self.config.adaptive_threshold) as usize;
bytes_read < threshold || bytes_read > target_size * 2
}
async fn adjust_chunk_size_adaptive(&mut self) {
let avg_processing_time = self.performance_monitor.average_chunk_processing_time();
let memory_pressure = self.performance_monitor.memory_pressure_detected();
if memory_pressure {
self.config.chunk_size = (self.config.chunk_size / 2).max(1024);
} else if avg_processing_time < Duration::from_millis(10) {
self.config.chunk_size = (self.config.chunk_size * 2).min(1024 * 1024);
}
}
}
#[derive(Debug)]
struct ProcessingChunk {
data: Vec<u8>,
#[allow(dead_code)]
timestamp: Instant,
#[allow(dead_code)]
sequence_id: usize,
}
#[derive(Debug, Clone)]
pub struct StreamingStatistics {
pub total_bytes_processed: usize,
pub total_triples_parsed: usize,
pub processing_time: Duration,
pub average_throughput_mbps: f64,
pub parse_errors: usize,
pub context_cache_hit_ratio: f64,
pub simd_operations_count: usize,
pub zero_copy_operations_count: usize,
}
impl PerformanceMonitor {
fn new() -> Self {
Self {
total_bytes_processed: AtomicUsize::new(0),
total_triples_parsed: AtomicUsize::new(0),
parse_errors: AtomicUsize::new(0),
context_cache_hits: AtomicUsize::new(0),
context_cache_misses: AtomicUsize::new(0),
simd_operations: AtomicUsize::new(0),
zero_copy_operations: AtomicUsize::new(0),
start_time: Instant::now(),
chunk_processing_times: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
}
}
fn record_bytes_processed(&self, bytes: usize) {
self.total_bytes_processed
.fetch_add(bytes, Ordering::Relaxed);
}
fn record_triples_parsed(&self, count: usize) {
self.total_triples_parsed
.fetch_add(count, Ordering::Relaxed);
}
fn should_flush_batch(&self) -> bool {
self.average_chunk_processing_time() > Duration::from_millis(100)
}
fn average_chunk_processing_time(&self) -> Duration {
let times = self.chunk_processing_times.lock();
if times.is_empty() {
return Duration::from_millis(1);
}
let total: Duration = times.iter().sum();
total / times.len() as u32
}
fn memory_pressure_detected(&self) -> bool {
false }
fn get_statistics(&self) -> StreamingStatistics {
let elapsed = self.start_time.elapsed();
let bytes = self.total_bytes_processed.load(Ordering::Relaxed);
let triples = self.total_triples_parsed.load(Ordering::Relaxed);
let errors = self.parse_errors.load(Ordering::Relaxed);
let cache_hits = self.context_cache_hits.load(Ordering::Relaxed);
let cache_misses = self.context_cache_misses.load(Ordering::Relaxed);
let simd_ops = self.simd_operations.load(Ordering::Relaxed);
let zero_copy_ops = self.zero_copy_operations.load(Ordering::Relaxed);
let throughput_mbps = if elapsed.as_secs() > 0 {
(bytes as f64) / (1024.0 * 1024.0) / elapsed.as_secs_f64()
} else {
0.0
};
let cache_hit_ratio = if cache_hits + cache_misses > 0 {
cache_hits as f64 / (cache_hits + cache_misses) as f64
} else {
0.0
};
StreamingStatistics {
total_bytes_processed: bytes,
total_triples_parsed: triples,
processing_time: elapsed,
average_throughput_mbps: throughput_mbps,
parse_errors: errors,
context_cache_hit_ratio: cache_hit_ratio,
simd_operations_count: simd_ops,
zero_copy_operations_count: zero_copy_ops,
}
}
}
impl BufferPool {
fn new(buffer_size: usize, max_buffers: usize) -> Self {
Self {
available_buffers: Arc::new(Mutex::new(Vec::with_capacity(max_buffers))),
buffer_size,
max_buffers,
current_buffers: AtomicUsize::new(0),
}
}
async fn get_buffer(&self) -> ZeroCopyBuffer {
loop {
{
let mut buffers = self.available_buffers.lock();
if let Some(buffer) = buffers.pop() {
return buffer;
}
}
if self.current_buffers.load(Ordering::Relaxed) < self.max_buffers {
self.current_buffers.fetch_add(1, Ordering::Relaxed);
return ZeroCopyBuffer::new(self.buffer_size);
} else {
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
}
#[allow(dead_code)]
fn return_buffer(&self, mut buffer: ZeroCopyBuffer) {
buffer.reset();
let mut buffers = self.available_buffers.lock();
if buffers.len() < self.max_buffers {
buffers.push(buffer);
} else {
self.current_buffers.fetch_sub(1, Ordering::Relaxed);
}
}
}
pub struct MemoryStreamingSink {
triples: Arc<RwLock<Vec<Triple>>>,
quads: Arc<RwLock<Vec<Quad>>>,
statistics: Arc<RwLock<SinkStatistics>>,
}
impl Default for MemoryStreamingSink {
fn default() -> Self {
Self::new()
}
}
impl MemoryStreamingSink {
pub fn new() -> Self {
Self {
triples: Arc::new(RwLock::new(Vec::new())),
quads: Arc::new(RwLock::new(Vec::new())),
statistics: Arc::new(RwLock::new(SinkStatistics {
total_triples_processed: 0,
total_quads_processed: 0,
average_batch_size: 0.0,
processing_rate_per_second: 0.0,
memory_usage_bytes: 0,
})),
}
}
pub fn into_triples(self) -> Arc<RwLock<Vec<Triple>>> {
self.triples
}
pub async fn get_triples(&self) -> Vec<Triple> {
self.triples.read().await.clone()
}
pub async fn get_quads(&self) -> Vec<Quad> {
self.quads.read().await.clone()
}
}
#[derive(Debug)]
pub struct StreamingError(Box<dyn StdError + Send + Sync>);
impl std::fmt::Display for StreamingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Streaming error: {}", self.0)
}
}
impl StdError for StreamingError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
Some(&*self.0)
}
}
impl From<Box<dyn StdError + Send + Sync>> for StreamingError {
fn from(err: Box<dyn StdError + Send + Sync>) -> Self {
StreamingError(err)
}
}
#[async_trait::async_trait]
impl StreamingSink for MemoryStreamingSink {
type Error = StreamingError;
async fn process_triple_batch(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error> {
let batch_size = triples.len();
self.triples.write().await.extend(triples);
let mut stats = self.statistics.write().await;
stats.total_triples_processed += batch_size;
stats.average_batch_size = (stats.average_batch_size + batch_size as f64) / 2.0;
Ok(())
}
async fn process_quad_batch(&mut self, quads: Vec<Quad>) -> Result<(), Self::Error> {
let batch_size = quads.len();
self.quads.write().await.extend(quads);
let mut stats = self.statistics.write().await;
stats.total_quads_processed += batch_size;
Ok(())
}
async fn flush(&mut self) -> Result<(), Self::Error> {
Ok(())
}
fn performance_statistics(&self) -> SinkStatistics {
SinkStatistics {
total_triples_processed: 0,
total_quads_processed: 0,
average_batch_size: 0.0,
processing_rate_per_second: 0.0,
memory_usage_bytes: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[tokio::test]
async fn test_ultra_streaming_parser() {
let json_ld_data = r#"[
{
"@id": "http://example.org/person/1",
"name": "Alice",
"age": 30
},
{
"@id": "http://example.org/person/2",
"name": "Bob",
"age": 25
}
]"#;
let config = StreamingConfig::default();
let mut parser = UltraStreamingJsonLdParser::new(config);
let reader = Cursor::new(json_ld_data.as_bytes());
let sink = MemoryStreamingSink::new();
let _sink_data = Arc::clone(&sink.triples);
let stats = parser
.stream_parse(reader, sink)
.await
.expect("async operation should succeed");
assert!(stats.total_bytes_processed > 0);
}
}