1use crate::{
7 jsonld::JsonLdParseError,
8 model::{NamedNode, Object, Predicate, Quad, Subject, Triple},
9 optimization::{SimdJsonProcessor, TermInterner, TermInternerExt, ZeroCopyBuffer},
10};
11use dashmap::DashMap;
13use parking_lot::Mutex;
15#[cfg(feature = "parallel")]
16use rayon::prelude::*;
17use serde_json::{Map, Value};
18use std::{
19 collections::VecDeque,
20 error::Error as StdError,
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26use tokio::{
27 io::{AsyncRead, AsyncReadExt, BufReader},
28 sync::{mpsc, RwLock, Semaphore},
29 time::{Duration, Instant},
30};
31
32pub struct UltraStreamingJsonLdParser {
34 config: StreamingConfig,
35 context_cache: Arc<DashMap<String, Arc<Value>>>,
36 term_interner: Arc<TermInterner>,
37 performance_monitor: Arc<PerformanceMonitor>,
38 simd_processor: SimdJsonProcessor,
39 buffer_pool: Arc<BufferPool>,
40}
41
42#[derive(Debug, Clone)]
44pub struct StreamingConfig {
45 pub chunk_size: usize,
47 pub max_concurrent_threads: usize,
49 pub buffer_size: usize,
51 pub enable_simd: bool,
53 pub context_cache_size: usize,
55 pub adaptive_threshold: f64,
57 pub memory_pressure_threshold: usize,
59 pub zero_copy_level: ZeroCopyLevel,
61 pub enable_profiling: bool,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq)]
67pub enum ZeroCopyLevel {
68 None,
70 Basic,
72 Advanced,
74 Maximum,
76}
77
78pub struct PerformanceMonitor {
80 total_bytes_processed: AtomicUsize,
81 total_triples_parsed: AtomicUsize,
82 parse_errors: AtomicUsize,
83 context_cache_hits: AtomicUsize,
84 context_cache_misses: AtomicUsize,
85 simd_operations: AtomicUsize,
86 zero_copy_operations: AtomicUsize,
87 start_time: Instant,
88 chunk_processing_times: Arc<Mutex<VecDeque<Duration>>>,
89}
90
91pub struct BufferPool {
93 available_buffers: Arc<Mutex<Vec<ZeroCopyBuffer>>>,
94 buffer_size: usize,
95 max_buffers: usize,
96 current_buffers: AtomicUsize,
97}
98
99#[async_trait::async_trait]
101pub trait StreamingSink: Send + Sync {
102 type Error: Send + Sync + std::error::Error + 'static;
103
104 async fn process_triple_batch(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error>;
105 async fn process_quad_batch(&mut self, quads: Vec<Quad>) -> Result<(), Self::Error>;
106 async fn flush(&mut self) -> Result<(), Self::Error>;
107 fn performance_statistics(&self) -> SinkStatistics;
108}
109
110#[derive(Debug, Clone)]
112pub struct SinkStatistics {
113 pub total_triples_processed: usize,
114 pub total_quads_processed: usize,
115 pub average_batch_size: f64,
116 pub processing_rate_per_second: f64,
117 pub memory_usage_bytes: usize,
118}
119
120impl Default for StreamingConfig {
121 fn default() -> Self {
122 Self {
123 chunk_size: 64 * 1024, max_concurrent_threads: num_cpus::get() * 2,
125 buffer_size: 1024 * 1024, enable_simd: true,
127 context_cache_size: 10000,
128 adaptive_threshold: 0.8,
129 memory_pressure_threshold: 8 * 1024 * 1024 * 1024, zero_copy_level: ZeroCopyLevel::Advanced,
131 enable_profiling: true,
132 }
133 }
134}
135
136impl UltraStreamingJsonLdParser {
137 pub fn new(config: StreamingConfig) -> Self {
139 Self {
140 context_cache: Arc::new(DashMap::with_capacity(config.context_cache_size)),
141 term_interner: Arc::new(TermInterner::new()),
142 performance_monitor: Arc::new(PerformanceMonitor::new()),
143 simd_processor: SimdJsonProcessor::new(),
144 buffer_pool: Arc::new(BufferPool::new(config.buffer_size, 100)),
145 config,
146 }
147 }
148
149 pub async fn stream_parse<R, S>(
151 &mut self,
152 reader: R,
153 mut sink: S,
154 ) -> Result<StreamingStatistics, JsonLdParseError>
155 where
156 R: AsyncRead + Unpin + Send + 'static,
157 S: StreamingSink + Send + 'static,
158 S::Error: 'static,
159 {
160 let mut buf_reader = BufReader::with_capacity(self.config.chunk_size, reader);
161 let (tx, mut rx) = mpsc::channel::<ProcessingChunk>(self.config.buffer_size);
162 let (triple_tx, mut triple_rx) = mpsc::channel::<Vec<Triple>>(100);
163 let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_threads));
164
165 let sink_handle = tokio::spawn(async move {
167 while let Some(batch) = triple_rx.recv().await {
168 sink.process_triple_batch(batch)
169 .await
170 .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
171 }
172
173 sink.flush()
174 .await
175 .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
176
177 Ok::<(), JsonLdParseError>(())
178 });
179
180 let processing_handle = tokio::spawn({
182 let config = self.config.clone();
183 let context_cache = Arc::clone(&self.context_cache);
184 let term_interner = Arc::clone(&self.term_interner);
185 let performance_monitor = Arc::clone(&self.performance_monitor);
186 let simd_processor = self.simd_processor.clone();
187 let triple_tx = triple_tx.clone();
188
189 async move {
190 let mut batch_buffer = Vec::with_capacity(config.buffer_size);
191
192 while let Some(chunk) = rx.recv().await {
193 let _permit = semaphore
194 .acquire()
195 .await
196 .expect("semaphore should not be closed");
197
198 let processed_triples = if config.enable_simd {
200 Self::process_chunk_simd(
201 chunk,
202 &context_cache,
203 &term_interner,
204 &simd_processor,
205 )
206 .await?
207 } else {
208 Self::process_chunk_standard(chunk, &context_cache, &term_interner).await?
209 };
210
211 performance_monitor.record_triples_parsed(processed_triples.len());
212
213 batch_buffer.extend(processed_triples);
214
215 if batch_buffer.len() >= config.buffer_size
217 || performance_monitor.should_flush_batch()
218 {
219 triple_tx
220 .send(std::mem::take(&mut batch_buffer))
221 .await
222 .map_err(|_| {
223 JsonLdParseError::ProcessingError(
224 "Triple channel send failed".to_string(),
225 )
226 })?;
227 }
228 }
229
230 if !batch_buffer.is_empty() {
232 triple_tx.send(batch_buffer).await.map_err(|_| {
233 JsonLdParseError::ProcessingError("Triple channel send failed".to_string())
234 })?;
235 }
236
237 Ok::<(), JsonLdParseError>(())
238 }
239 });
240
241 let mut buffer = self.buffer_pool.get_buffer().await;
243 let mut total_bytes = 0;
244
245 loop {
246 match buf_reader.read(buffer.as_mut_slice()).await {
247 Ok(0) => break, Ok(n) => {
249 buffer.set_len(n);
250 total_bytes += n;
251 self.performance_monitor.record_bytes_processed(n);
252
253 if self.should_adjust_chunk_size(n) {
255 self.adjust_chunk_size_adaptive().await;
256 }
257
258 let chunk = ProcessingChunk {
259 data: buffer.as_slice().to_vec(),
260 timestamp: Instant::now(),
261 sequence_id: total_bytes,
262 };
263
264 tx.send(chunk).await.map_err(|_| {
265 JsonLdParseError::ProcessingError("Channel send failed".to_string())
266 })?;
267
268 buffer = self.buffer_pool.get_buffer().await;
269 }
270 Err(e) => return Err(JsonLdParseError::Io(e)),
271 }
272 }
273
274 drop(tx); processing_handle
276 .await
277 .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))??;
278
279 drop(triple_tx); sink_handle
281 .await
282 .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))??;
283
284 Ok(self.performance_monitor.get_statistics())
285 }
286
287 async fn process_chunk_simd(
289 chunk: ProcessingChunk,
290 context_cache: &DashMap<String, Arc<Value>>,
291 term_interner: &TermInterner,
292 simd_processor: &SimdJsonProcessor,
293 ) -> Result<Vec<Triple>, JsonLdParseError> {
294 let start = Instant::now();
295
296 let json_value = simd_processor
298 .parse_json(&chunk.data)
299 .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
300
301 let context = Self::resolve_context_zero_copy(&json_value, context_cache).await?;
303
304 #[cfg(feature = "parallel")]
306 let triples = Self::extract_triples_parallel(&json_value, &context, term_interner).await?;
307 #[cfg(not(feature = "parallel"))]
308 let triples = Self::extract_triples_standard(&json_value, &context, term_interner).await?;
309
310 let _processing_time = start.elapsed();
312 Ok(triples)
315 }
316
317 async fn process_chunk_standard(
319 chunk: ProcessingChunk,
320 context_cache: &DashMap<String, Arc<Value>>,
321 term_interner: &TermInterner,
322 ) -> Result<Vec<Triple>, JsonLdParseError> {
323 let json_value: Value = serde_json::from_slice(&chunk.data)
325 .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
326
327 let context = Self::resolve_context_cached(&json_value, context_cache).await?;
329
330 let triples = Self::extract_triples_standard(&json_value, &context, term_interner).await?;
332
333 Ok(triples)
334 }
335
336 async fn resolve_context_zero_copy(
338 json_value: &Value,
339 context_cache: &DashMap<String, Arc<Value>>,
340 ) -> Result<Arc<Value>, JsonLdParseError> {
341 if let Some(context_ref) = json_value.get("@context") {
342 if let Some(context_str) = context_ref.as_str() {
343 if let Some(cached_context) = context_cache.get(context_str) {
344 return Ok(Arc::clone(&cached_context));
345 }
346
347 let resolved_context = Self::resolve_remote_context(context_str).await?;
349 let context_arc = Arc::new(resolved_context);
350 context_cache.insert(context_str.to_string(), Arc::clone(&context_arc));
351 return Ok(context_arc);
352 }
353 }
354
355 Ok(Arc::new(Value::Object(Map::new())))
357 }
358
359 async fn resolve_context_cached(
361 json_value: &Value,
362 context_cache: &DashMap<String, Arc<Value>>,
363 ) -> Result<Arc<Value>, JsonLdParseError> {
364 Self::resolve_context_zero_copy(json_value, context_cache).await
366 }
367
368 #[cfg(feature = "parallel")]
370 async fn extract_triples_parallel(
371 json_value: &Value,
372 context: &Value,
373 term_interner: &TermInterner,
374 ) -> Result<Vec<Triple>, JsonLdParseError> {
375 if let Value::Array(objects) = json_value {
376 let triples: Result<Vec<Vec<Triple>>, JsonLdParseError> = objects
378 .par_iter()
379 .map(|obj| Self::extract_triples_from_object(obj, context, term_interner))
380 .collect();
381
382 Ok(triples?.into_iter().flatten().collect())
383 } else {
384 Self::extract_triples_from_object(json_value, context, term_interner)
385 }
386 }
387
388 async fn extract_triples_standard(
390 json_value: &Value,
391 context: &Value,
392 term_interner: &TermInterner,
393 ) -> Result<Vec<Triple>, JsonLdParseError> {
394 Self::extract_triples_from_object(json_value, context, term_interner)
395 }
396
397 fn extract_triples_from_object(
399 obj: &Value,
400 context: &Value,
401 term_interner: &TermInterner,
402 ) -> Result<Vec<Triple>, JsonLdParseError> {
403 let mut triples = Vec::new();
404
405 if let Value::Object(map) = obj {
406 let subject: Subject = if let Some(id) = map.get("@id") {
408 Subject::NamedNode(term_interner.intern_named_node(id.as_str().ok_or_else(
409 || JsonLdParseError::ProcessingError("Invalid @id".to_string()),
410 )?)?)
411 } else {
412 Subject::BlankNode(term_interner.intern_blank_node())
414 };
415
416 for (key, value) in map {
418 if key.starts_with('@') {
419 continue; }
421
422 let predicate_iri = Self::expand_property(key, context)?;
424 let predicate = term_interner.intern_named_node(&predicate_iri)?;
425
426 match value {
428 Value::Array(values) => {
429 for val in values {
430 if let Some(triple) = Self::create_triple_from_value(
431 subject.clone(),
432 predicate.clone(),
433 val,
434 context,
435 term_interner,
436 )? {
437 triples.push(triple);
438 }
439 }
440 }
441 _ => {
442 if let Some(triple) = Self::create_triple_from_value(
443 subject.clone(),
444 predicate,
445 value,
446 context,
447 term_interner,
448 )? {
449 triples.push(triple);
450 }
451 }
452 }
453 }
454 }
455
456 Ok(triples)
457 }
458
459 fn create_triple_from_value(
461 subject: Subject,
462 predicate: NamedNode,
463 value: &Value,
464 _context: &Value,
465 term_interner: &TermInterner,
466 ) -> Result<Option<Triple>, JsonLdParseError> {
467 let object: Object = match value {
468 Value::String(s) => {
469 if s.starts_with("http://") || s.starts_with("https://") {
471 Object::NamedNode(term_interner.intern_named_node(s)?)
472 } else {
473 Object::Literal(term_interner.intern_literal(s)?)
474 }
475 }
476 Value::Object(obj) => {
477 if let Some(id) = obj.get("@id") {
478 Object::NamedNode(term_interner.intern_named_node(id.as_str().ok_or_else(
480 || JsonLdParseError::ProcessingError("Invalid @id in object".to_string()),
481 )?)?)
482 } else if let Some(val) = obj.get("@value") {
483 let literal_value = val.as_str().ok_or_else(|| {
485 JsonLdParseError::ProcessingError("Invalid @value".to_string())
486 })?;
487
488 if let Some(datatype) = obj.get("@type") {
489 let datatype_iri = datatype.as_str().ok_or_else(|| {
490 JsonLdParseError::ProcessingError("Invalid @type".to_string())
491 })?;
492 Object::Literal(
493 term_interner
494 .intern_literal_with_datatype(literal_value, datatype_iri)?,
495 )
496 } else if let Some(lang) = obj.get("@language") {
497 let language = lang.as_str().ok_or_else(|| {
498 JsonLdParseError::ProcessingError("Invalid @language".to_string())
499 })?;
500 Object::Literal(
501 term_interner.intern_literal_with_language(literal_value, language)?,
502 )
503 } else {
504 Object::Literal(term_interner.intern_literal(literal_value)?)
505 }
506 } else {
507 return Ok(None); }
509 }
510 Value::Number(n) => Object::Literal(term_interner.intern_literal(&n.to_string())?),
511 Value::Bool(b) => Object::Literal(term_interner.intern_literal(&b.to_string())?),
512 _ => return Ok(None),
513 };
514
515 Ok(Some(Triple::new(
516 subject,
517 Predicate::NamedNode(predicate),
518 object,
519 )))
520 }
521
522 fn expand_property(property: &str, context: &Value) -> Result<String, JsonLdParseError> {
524 if property.contains(':') {
526 Ok(property.to_string())
527 } else if let Value::Object(ctx) = context {
528 if let Some(expanded) = ctx.get(property) {
529 if let Some(iri) = expanded.as_str() {
530 Ok(iri.to_string())
531 } else {
532 Ok(format!("http://example.org/{property}"))
533 }
534 } else {
535 Ok(format!("http://example.org/{property}"))
536 }
537 } else {
538 Ok(format!("http://example.org/{property}"))
539 }
540 }
541
542 async fn resolve_remote_context(_context_iri: &str) -> Result<Value, JsonLdParseError> {
544 Ok(Value::Object(Map::new()))
547 }
548
549 fn should_adjust_chunk_size(&self, bytes_read: usize) -> bool {
551 let target_size = self.config.chunk_size;
552 let threshold = (target_size as f64 * self.config.adaptive_threshold) as usize;
553 bytes_read < threshold || bytes_read > target_size * 2
554 }
555
556 async fn adjust_chunk_size_adaptive(&mut self) {
558 let avg_processing_time = self.performance_monitor.average_chunk_processing_time();
559 let memory_pressure = self.performance_monitor.memory_pressure_detected();
560
561 if memory_pressure {
562 self.config.chunk_size = (self.config.chunk_size / 2).max(1024);
563 } else if avg_processing_time < Duration::from_millis(10) {
564 self.config.chunk_size = (self.config.chunk_size * 2).min(1024 * 1024);
565 }
566 }
567}
568
569#[derive(Debug)]
571struct ProcessingChunk {
572 data: Vec<u8>,
573 #[allow(dead_code)]
574 timestamp: Instant,
575 #[allow(dead_code)]
576 sequence_id: usize,
577}
578
579#[derive(Debug, Clone)]
581pub struct StreamingStatistics {
582 pub total_bytes_processed: usize,
583 pub total_triples_parsed: usize,
584 pub processing_time: Duration,
585 pub average_throughput_mbps: f64,
586 pub parse_errors: usize,
587 pub context_cache_hit_ratio: f64,
588 pub simd_operations_count: usize,
589 pub zero_copy_operations_count: usize,
590}
591
592impl PerformanceMonitor {
593 fn new() -> Self {
594 Self {
595 total_bytes_processed: AtomicUsize::new(0),
596 total_triples_parsed: AtomicUsize::new(0),
597 parse_errors: AtomicUsize::new(0),
598 context_cache_hits: AtomicUsize::new(0),
599 context_cache_misses: AtomicUsize::new(0),
600 simd_operations: AtomicUsize::new(0),
601 zero_copy_operations: AtomicUsize::new(0),
602 start_time: Instant::now(),
603 chunk_processing_times: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
604 }
605 }
606
607 fn record_bytes_processed(&self, bytes: usize) {
608 self.total_bytes_processed
609 .fetch_add(bytes, Ordering::Relaxed);
610 }
611
612 fn record_triples_parsed(&self, count: usize) {
613 self.total_triples_parsed
614 .fetch_add(count, Ordering::Relaxed);
615 }
616
617 fn should_flush_batch(&self) -> bool {
618 self.average_chunk_processing_time() > Duration::from_millis(100)
620 }
621
622 fn average_chunk_processing_time(&self) -> Duration {
623 let times = self.chunk_processing_times.lock();
624 if times.is_empty() {
625 return Duration::from_millis(1);
626 }
627
628 let total: Duration = times.iter().sum();
629 total / times.len() as u32
630 }
631
632 fn memory_pressure_detected(&self) -> bool {
633 false }
636
637 fn get_statistics(&self) -> StreamingStatistics {
638 let elapsed = self.start_time.elapsed();
639 let bytes = self.total_bytes_processed.load(Ordering::Relaxed);
640 let triples = self.total_triples_parsed.load(Ordering::Relaxed);
641 let errors = self.parse_errors.load(Ordering::Relaxed);
642 let cache_hits = self.context_cache_hits.load(Ordering::Relaxed);
643 let cache_misses = self.context_cache_misses.load(Ordering::Relaxed);
644 let simd_ops = self.simd_operations.load(Ordering::Relaxed);
645 let zero_copy_ops = self.zero_copy_operations.load(Ordering::Relaxed);
646
647 let throughput_mbps = if elapsed.as_secs() > 0 {
648 (bytes as f64) / (1024.0 * 1024.0) / elapsed.as_secs_f64()
649 } else {
650 0.0
651 };
652
653 let cache_hit_ratio = if cache_hits + cache_misses > 0 {
654 cache_hits as f64 / (cache_hits + cache_misses) as f64
655 } else {
656 0.0
657 };
658
659 StreamingStatistics {
660 total_bytes_processed: bytes,
661 total_triples_parsed: triples,
662 processing_time: elapsed,
663 average_throughput_mbps: throughput_mbps,
664 parse_errors: errors,
665 context_cache_hit_ratio: cache_hit_ratio,
666 simd_operations_count: simd_ops,
667 zero_copy_operations_count: zero_copy_ops,
668 }
669 }
670}
671
672impl BufferPool {
673 fn new(buffer_size: usize, max_buffers: usize) -> Self {
674 Self {
675 available_buffers: Arc::new(Mutex::new(Vec::with_capacity(max_buffers))),
676 buffer_size,
677 max_buffers,
678 current_buffers: AtomicUsize::new(0),
679 }
680 }
681
682 async fn get_buffer(&self) -> ZeroCopyBuffer {
683 loop {
684 {
686 let mut buffers = self.available_buffers.lock();
687 if let Some(buffer) = buffers.pop() {
688 return buffer;
689 }
690 } if self.current_buffers.load(Ordering::Relaxed) < self.max_buffers {
693 self.current_buffers.fetch_add(1, Ordering::Relaxed);
694 return ZeroCopyBuffer::new(self.buffer_size);
695 } else {
696 tokio::time::sleep(Duration::from_millis(1)).await;
698 }
699 }
700 }
701
702 #[allow(dead_code)]
703 fn return_buffer(&self, mut buffer: ZeroCopyBuffer) {
704 buffer.reset();
705 let mut buffers = self.available_buffers.lock();
706 if buffers.len() < self.max_buffers {
707 buffers.push(buffer);
708 } else {
709 self.current_buffers.fetch_sub(1, Ordering::Relaxed);
710 }
711 }
712}
713
714pub struct MemoryStreamingSink {
716 triples: Arc<RwLock<Vec<Triple>>>,
717 quads: Arc<RwLock<Vec<Quad>>>,
718 statistics: Arc<RwLock<SinkStatistics>>,
719}
720
721impl Default for MemoryStreamingSink {
722 fn default() -> Self {
723 Self::new()
724 }
725}
726
727impl MemoryStreamingSink {
728 pub fn new() -> Self {
729 Self {
730 triples: Arc::new(RwLock::new(Vec::new())),
731 quads: Arc::new(RwLock::new(Vec::new())),
732 statistics: Arc::new(RwLock::new(SinkStatistics {
733 total_triples_processed: 0,
734 total_quads_processed: 0,
735 average_batch_size: 0.0,
736 processing_rate_per_second: 0.0,
737 memory_usage_bytes: 0,
738 })),
739 }
740 }
741
742 pub fn into_triples(self) -> Arc<RwLock<Vec<Triple>>> {
743 self.triples
744 }
745
746 pub async fn get_triples(&self) -> Vec<Triple> {
747 self.triples.read().await.clone()
748 }
749
750 pub async fn get_quads(&self) -> Vec<Quad> {
751 self.quads.read().await.clone()
752 }
753}
754
755#[derive(Debug)]
757pub struct StreamingError(Box<dyn StdError + Send + Sync>);
758
759impl std::fmt::Display for StreamingError {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 write!(f, "Streaming error: {}", self.0)
762 }
763}
764
765impl StdError for StreamingError {
766 fn source(&self) -> Option<&(dyn StdError + 'static)> {
767 Some(&*self.0)
768 }
769}
770
771impl From<Box<dyn StdError + Send + Sync>> for StreamingError {
772 fn from(err: Box<dyn StdError + Send + Sync>) -> Self {
773 StreamingError(err)
774 }
775}
776
777#[async_trait::async_trait]
778impl StreamingSink for MemoryStreamingSink {
779 type Error = StreamingError;
780
781 async fn process_triple_batch(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error> {
782 let batch_size = triples.len();
783 self.triples.write().await.extend(triples);
784
785 let mut stats = self.statistics.write().await;
786 stats.total_triples_processed += batch_size;
787 stats.average_batch_size = (stats.average_batch_size + batch_size as f64) / 2.0;
788
789 Ok(())
790 }
791
792 async fn process_quad_batch(&mut self, quads: Vec<Quad>) -> Result<(), Self::Error> {
793 let batch_size = quads.len();
794 self.quads.write().await.extend(quads);
795
796 let mut stats = self.statistics.write().await;
797 stats.total_quads_processed += batch_size;
798
799 Ok(())
800 }
801
802 async fn flush(&mut self) -> Result<(), Self::Error> {
803 Ok(())
805 }
806
807 fn performance_statistics(&self) -> SinkStatistics {
808 SinkStatistics {
810 total_triples_processed: 0,
811 total_quads_processed: 0,
812 average_batch_size: 0.0,
813 processing_rate_per_second: 0.0,
814 memory_usage_bytes: 0,
815 }
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use std::io::Cursor;
823
824 #[tokio::test]
825 async fn test_ultra_streaming_parser() {
826 let json_ld_data = r#"[
827 {
828 "@id": "http://example.org/person/1",
829 "name": "Alice",
830 "age": 30
831 },
832 {
833 "@id": "http://example.org/person/2",
834 "name": "Bob",
835 "age": 25
836 }
837 ]"#;
838
839 let config = StreamingConfig::default();
840 let mut parser = UltraStreamingJsonLdParser::new(config);
841 let reader = Cursor::new(json_ld_data.as_bytes());
842 let sink = MemoryStreamingSink::new();
843
844 let _sink_data = Arc::clone(&sink.triples);
846
847 let stats = parser
848 .stream_parse(reader, sink)
849 .await
850 .expect("async operation should succeed");
851
852 assert!(stats.total_bytes_processed > 0);
853 }
857}