Skip to main content

oxirs_core/jsonld/
streaming.rs

1//! Ultra-high performance streaming JSON-LD processing
2//!
3//! This module provides advanced streaming capabilities for JSON-LD processing
4//! with zero-copy operations, SIMD acceleration, and adaptive buffering.
5
6use crate::{
7    jsonld::JsonLdParseError,
8    model::{NamedNode, Object, Predicate, Quad, Subject, Triple},
9    optimization::{SimdJsonProcessor, TermInterner, TermInternerExt, ZeroCopyBuffer},
10};
11// Removed unused async_trait::async_trait import
12use dashmap::DashMap;
13// Removed unused futures::{SinkExt, StreamExt} imports
14use parking_lot::Mutex;
15#[cfg(feature = "parallel")]
16use rayon::prelude::*;
17use serde_json::{Map, Value};
18use std::{
19    collections::VecDeque,
20    error::Error as StdError,
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26use tokio::{
27    io::{AsyncRead, AsyncReadExt, BufReader},
28    sync::{mpsc, RwLock, Semaphore},
29    time::{Duration, Instant},
30};
31
32/// Ultra-high performance streaming JSON-LD parser with adaptive optimizations
33pub struct UltraStreamingJsonLdParser {
34    config: StreamingConfig,
35    context_cache: Arc<DashMap<String, Arc<Value>>>,
36    term_interner: Arc<TermInterner>,
37    performance_monitor: Arc<PerformanceMonitor>,
38    simd_processor: SimdJsonProcessor,
39    buffer_pool: Arc<BufferPool>,
40}
41
42/// Advanced configuration for streaming JSON-LD processing
43#[derive(Debug, Clone)]
44pub struct StreamingConfig {
45    /// Chunk size for reading data (adaptive)
46    pub chunk_size: usize,
47    /// Maximum number of concurrent processing threads
48    pub max_concurrent_threads: usize,
49    /// Buffer size for intermediate processing
50    pub buffer_size: usize,
51    /// Enable SIMD acceleration for JSON parsing
52    pub enable_simd: bool,
53    /// Context caching configuration
54    pub context_cache_size: usize,
55    /// Adaptive buffering threshold
56    pub adaptive_threshold: f64,
57    /// Memory pressure detection
58    pub memory_pressure_threshold: usize,
59    /// Zero-copy optimization level
60    pub zero_copy_level: ZeroCopyLevel,
61    /// Performance profiling enabled
62    pub enable_profiling: bool,
63}
64
65/// Zero-copy optimization levels
66#[derive(Debug, Clone, Copy, PartialEq)]
67pub enum ZeroCopyLevel {
68    /// No zero-copy optimizations
69    None,
70    /// Basic zero-copy for string references
71    Basic,
72    /// Advanced zero-copy with arena allocation
73    Advanced,
74    /// Maximum zero-copy with custom allocators
75    Maximum,
76}
77
78/// Real-time performance monitoring for streaming operations
79pub struct PerformanceMonitor {
80    total_bytes_processed: AtomicUsize,
81    total_triples_parsed: AtomicUsize,
82    parse_errors: AtomicUsize,
83    context_cache_hits: AtomicUsize,
84    context_cache_misses: AtomicUsize,
85    simd_operations: AtomicUsize,
86    zero_copy_operations: AtomicUsize,
87    start_time: Instant,
88    chunk_processing_times: Arc<Mutex<VecDeque<Duration>>>,
89}
90
91/// Adaptive buffer pool for high-throughput processing
92pub struct BufferPool {
93    available_buffers: Arc<Mutex<Vec<ZeroCopyBuffer>>>,
94    buffer_size: usize,
95    max_buffers: usize,
96    current_buffers: AtomicUsize,
97}
98
99/// High-performance streaming sink for processed triples
100#[async_trait::async_trait]
101pub trait StreamingSink: Send + Sync {
102    type Error: Send + Sync + std::error::Error + 'static;
103
104    async fn process_triple_batch(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error>;
105    async fn process_quad_batch(&mut self, quads: Vec<Quad>) -> Result<(), Self::Error>;
106    async fn flush(&mut self) -> Result<(), Self::Error>;
107    fn performance_statistics(&self) -> SinkStatistics;
108}
109
110/// Statistics for sink performance monitoring
111#[derive(Debug, Clone)]
112pub struct SinkStatistics {
113    pub total_triples_processed: usize,
114    pub total_quads_processed: usize,
115    pub average_batch_size: f64,
116    pub processing_rate_per_second: f64,
117    pub memory_usage_bytes: usize,
118}
119
120impl Default for StreamingConfig {
121    fn default() -> Self {
122        Self {
123            chunk_size: 64 * 1024, // 64KB adaptive starting point
124            max_concurrent_threads: num_cpus::get() * 2,
125            buffer_size: 1024 * 1024, // 1MB buffer
126            enable_simd: true,
127            context_cache_size: 10000,
128            adaptive_threshold: 0.8,
129            memory_pressure_threshold: 8 * 1024 * 1024 * 1024, // 8GB
130            zero_copy_level: ZeroCopyLevel::Advanced,
131            enable_profiling: true,
132        }
133    }
134}
135
136impl UltraStreamingJsonLdParser {
137    /// Create a new ultra-performance streaming parser
138    pub fn new(config: StreamingConfig) -> Self {
139        Self {
140            context_cache: Arc::new(DashMap::with_capacity(config.context_cache_size)),
141            term_interner: Arc::new(TermInterner::new()),
142            performance_monitor: Arc::new(PerformanceMonitor::new()),
143            simd_processor: SimdJsonProcessor::new(),
144            buffer_pool: Arc::new(BufferPool::new(config.buffer_size, 100)),
145            config,
146        }
147    }
148
149    /// Stream parse JSON-LD with ultra-high performance optimizations
150    pub async fn stream_parse<R, S>(
151        &mut self,
152        reader: R,
153        mut sink: S,
154    ) -> Result<StreamingStatistics, JsonLdParseError>
155    where
156        R: AsyncRead + Unpin + Send + 'static,
157        S: StreamingSink + Send + 'static,
158        S::Error: 'static,
159    {
160        let mut buf_reader = BufReader::with_capacity(self.config.chunk_size, reader);
161        let (tx, mut rx) = mpsc::channel::<ProcessingChunk>(self.config.buffer_size);
162        let (triple_tx, mut triple_rx) = mpsc::channel::<Vec<Triple>>(100);
163        let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_threads));
164
165        // Spawn sink processing task
166        let sink_handle = tokio::spawn(async move {
167            while let Some(batch) = triple_rx.recv().await {
168                sink.process_triple_batch(batch)
169                    .await
170                    .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
171            }
172
173            sink.flush()
174                .await
175                .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
176
177            Ok::<(), JsonLdParseError>(())
178        });
179
180        // Spawn parallel processing tasks
181        let processing_handle = tokio::spawn({
182            let config = self.config.clone();
183            let context_cache = Arc::clone(&self.context_cache);
184            let term_interner = Arc::clone(&self.term_interner);
185            let performance_monitor = Arc::clone(&self.performance_monitor);
186            let simd_processor = self.simd_processor.clone();
187            let triple_tx = triple_tx.clone();
188
189            async move {
190                let mut batch_buffer = Vec::with_capacity(config.buffer_size);
191
192                while let Some(chunk) = rx.recv().await {
193                    let _permit = semaphore
194                        .acquire()
195                        .await
196                        .expect("semaphore should not be closed");
197
198                    // Process chunk with SIMD acceleration if available
199                    let processed_triples = if config.enable_simd {
200                        Self::process_chunk_simd(
201                            chunk,
202                            &context_cache,
203                            &term_interner,
204                            &simd_processor,
205                        )
206                        .await?
207                    } else {
208                        Self::process_chunk_standard(chunk, &context_cache, &term_interner).await?
209                    };
210
211                    performance_monitor.record_triples_parsed(processed_triples.len());
212
213                    batch_buffer.extend(processed_triples);
214
215                    // Adaptive batching based on performance metrics
216                    if batch_buffer.len() >= config.buffer_size
217                        || performance_monitor.should_flush_batch()
218                    {
219                        triple_tx
220                            .send(std::mem::take(&mut batch_buffer))
221                            .await
222                            .map_err(|_| {
223                                JsonLdParseError::ProcessingError(
224                                    "Triple channel send failed".to_string(),
225                                )
226                            })?;
227                    }
228                }
229
230                // Flush remaining triples
231                if !batch_buffer.is_empty() {
232                    triple_tx.send(batch_buffer).await.map_err(|_| {
233                        JsonLdParseError::ProcessingError("Triple channel send failed".to_string())
234                    })?;
235                }
236
237                Ok::<(), JsonLdParseError>(())
238            }
239        });
240
241        // Read and chunk data adaptively
242        let mut buffer = self.buffer_pool.get_buffer().await;
243        let mut total_bytes = 0;
244
245        loop {
246            match buf_reader.read(buffer.as_mut_slice()).await {
247                Ok(0) => break, // EOF
248                Ok(n) => {
249                    buffer.set_len(n);
250                    total_bytes += n;
251                    self.performance_monitor.record_bytes_processed(n);
252
253                    // Adaptive chunk size adjustment
254                    if self.should_adjust_chunk_size(n) {
255                        self.adjust_chunk_size_adaptive().await;
256                    }
257
258                    let chunk = ProcessingChunk {
259                        data: buffer.as_slice().to_vec(),
260                        timestamp: Instant::now(),
261                        sequence_id: total_bytes,
262                    };
263
264                    tx.send(chunk).await.map_err(|_| {
265                        JsonLdParseError::ProcessingError("Channel send failed".to_string())
266                    })?;
267
268                    buffer = self.buffer_pool.get_buffer().await;
269                }
270                Err(e) => return Err(JsonLdParseError::Io(e)),
271            }
272        }
273
274        drop(tx); // Signal completion to processing task
275        processing_handle
276            .await
277            .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))??;
278
279        drop(triple_tx); // Signal completion to sink task
280        sink_handle
281            .await
282            .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))??;
283
284        Ok(self.performance_monitor.get_statistics())
285    }
286
287    /// Process chunk with SIMD acceleration
288    async fn process_chunk_simd(
289        chunk: ProcessingChunk,
290        context_cache: &DashMap<String, Arc<Value>>,
291        term_interner: &TermInterner,
292        simd_processor: &SimdJsonProcessor,
293    ) -> Result<Vec<Triple>, JsonLdParseError> {
294        let start = Instant::now();
295
296        // SIMD-accelerated JSON parsing
297        let json_value = simd_processor
298            .parse_json(&chunk.data)
299            .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
300
301        // Zero-copy context resolution
302        let context = Self::resolve_context_zero_copy(&json_value, context_cache).await?;
303
304        // Parallel triple extraction with work-stealing
305        #[cfg(feature = "parallel")]
306        let triples = Self::extract_triples_parallel(&json_value, &context, term_interner).await?;
307        #[cfg(not(feature = "parallel"))]
308        let triples = Self::extract_triples_standard(&json_value, &context, term_interner).await?;
309
310        // Record performance metrics
311        let _processing_time = start.elapsed();
312        // performance_monitor.record_chunk_processing_time(processing_time);
313
314        Ok(triples)
315    }
316
317    /// Process chunk with standard methods
318    async fn process_chunk_standard(
319        chunk: ProcessingChunk,
320        context_cache: &DashMap<String, Arc<Value>>,
321        term_interner: &TermInterner,
322    ) -> Result<Vec<Triple>, JsonLdParseError> {
323        // Standard JSON parsing
324        let json_value: Value = serde_json::from_slice(&chunk.data)
325            .map_err(|e| JsonLdParseError::ProcessingError(e.to_string()))?;
326
327        // Context resolution with caching
328        let context = Self::resolve_context_cached(&json_value, context_cache).await?;
329
330        // Triple extraction
331        let triples = Self::extract_triples_standard(&json_value, &context, term_interner).await?;
332
333        Ok(triples)
334    }
335
336    /// Zero-copy context resolution
337    async fn resolve_context_zero_copy(
338        json_value: &Value,
339        context_cache: &DashMap<String, Arc<Value>>,
340    ) -> Result<Arc<Value>, JsonLdParseError> {
341        if let Some(context_ref) = json_value.get("@context") {
342            if let Some(context_str) = context_ref.as_str() {
343                if let Some(cached_context) = context_cache.get(context_str) {
344                    return Ok(Arc::clone(&cached_context));
345                }
346
347                // Resolve and cache context
348                let resolved_context = Self::resolve_remote_context(context_str).await?;
349                let context_arc = Arc::new(resolved_context);
350                context_cache.insert(context_str.to_string(), Arc::clone(&context_arc));
351                return Ok(context_arc);
352            }
353        }
354
355        // Default context
356        Ok(Arc::new(Value::Object(Map::new())))
357    }
358
359    /// Cached context resolution
360    async fn resolve_context_cached(
361        json_value: &Value,
362        context_cache: &DashMap<String, Arc<Value>>,
363    ) -> Result<Arc<Value>, JsonLdParseError> {
364        // Similar to zero-copy but with different optimization strategy
365        Self::resolve_context_zero_copy(json_value, context_cache).await
366    }
367
368    /// Parallel triple extraction with work-stealing
369    #[cfg(feature = "parallel")]
370    async fn extract_triples_parallel(
371        json_value: &Value,
372        context: &Value,
373        term_interner: &TermInterner,
374    ) -> Result<Vec<Triple>, JsonLdParseError> {
375        if let Value::Array(objects) = json_value {
376            // Parallel processing of JSON-LD objects
377            let triples: Result<Vec<Vec<Triple>>, JsonLdParseError> = objects
378                .par_iter()
379                .map(|obj| Self::extract_triples_from_object(obj, context, term_interner))
380                .collect();
381
382            Ok(triples?.into_iter().flatten().collect())
383        } else {
384            Self::extract_triples_from_object(json_value, context, term_interner)
385        }
386    }
387
388    /// Standard triple extraction
389    async fn extract_triples_standard(
390        json_value: &Value,
391        context: &Value,
392        term_interner: &TermInterner,
393    ) -> Result<Vec<Triple>, JsonLdParseError> {
394        Self::extract_triples_from_object(json_value, context, term_interner)
395    }
396
397    /// Extract triples from a single JSON-LD object
398    fn extract_triples_from_object(
399        obj: &Value,
400        context: &Value,
401        term_interner: &TermInterner,
402    ) -> Result<Vec<Triple>, JsonLdParseError> {
403        let mut triples = Vec::new();
404
405        if let Value::Object(map) = obj {
406            // Extract subject
407            let subject: Subject = if let Some(id) = map.get("@id") {
408                Subject::NamedNode(term_interner.intern_named_node(id.as_str().ok_or_else(
409                    || JsonLdParseError::ProcessingError("Invalid @id".to_string()),
410                )?)?)
411            } else {
412                // Generate blank node
413                Subject::BlankNode(term_interner.intern_blank_node())
414            };
415
416            // Process properties
417            for (key, value) in map {
418                if key.starts_with('@') {
419                    continue; // Skip JSON-LD keywords
420                }
421
422                // Expand property IRI using context
423                let predicate_iri = Self::expand_property(key, context)?;
424                let predicate = term_interner.intern_named_node(&predicate_iri)?;
425
426                // Process values
427                match value {
428                    Value::Array(values) => {
429                        for val in values {
430                            if let Some(triple) = Self::create_triple_from_value(
431                                subject.clone(),
432                                predicate.clone(),
433                                val,
434                                context,
435                                term_interner,
436                            )? {
437                                triples.push(triple);
438                            }
439                        }
440                    }
441                    _ => {
442                        if let Some(triple) = Self::create_triple_from_value(
443                            subject.clone(),
444                            predicate,
445                            value,
446                            context,
447                            term_interner,
448                        )? {
449                            triples.push(triple);
450                        }
451                    }
452                }
453            }
454        }
455
456        Ok(triples)
457    }
458
459    /// Create triple from JSON-LD value
460    fn create_triple_from_value(
461        subject: Subject,
462        predicate: NamedNode,
463        value: &Value,
464        _context: &Value,
465        term_interner: &TermInterner,
466    ) -> Result<Option<Triple>, JsonLdParseError> {
467        let object: Object = match value {
468            Value::String(s) => {
469                // Check if it's an IRI or literal
470                if s.starts_with("http://") || s.starts_with("https://") {
471                    Object::NamedNode(term_interner.intern_named_node(s)?)
472                } else {
473                    Object::Literal(term_interner.intern_literal(s)?)
474                }
475            }
476            Value::Object(obj) => {
477                if let Some(id) = obj.get("@id") {
478                    // Object reference
479                    Object::NamedNode(term_interner.intern_named_node(id.as_str().ok_or_else(
480                        || JsonLdParseError::ProcessingError("Invalid @id in object".to_string()),
481                    )?)?)
482                } else if let Some(val) = obj.get("@value") {
483                    // Typed literal
484                    let literal_value = val.as_str().ok_or_else(|| {
485                        JsonLdParseError::ProcessingError("Invalid @value".to_string())
486                    })?;
487
488                    if let Some(datatype) = obj.get("@type") {
489                        let datatype_iri = datatype.as_str().ok_or_else(|| {
490                            JsonLdParseError::ProcessingError("Invalid @type".to_string())
491                        })?;
492                        Object::Literal(
493                            term_interner
494                                .intern_literal_with_datatype(literal_value, datatype_iri)?,
495                        )
496                    } else if let Some(lang) = obj.get("@language") {
497                        let language = lang.as_str().ok_or_else(|| {
498                            JsonLdParseError::ProcessingError("Invalid @language".to_string())
499                        })?;
500                        Object::Literal(
501                            term_interner.intern_literal_with_language(literal_value, language)?,
502                        )
503                    } else {
504                        Object::Literal(term_interner.intern_literal(literal_value)?)
505                    }
506                } else {
507                    return Ok(None); // Skip complex nested objects for now
508                }
509            }
510            Value::Number(n) => Object::Literal(term_interner.intern_literal(&n.to_string())?),
511            Value::Bool(b) => Object::Literal(term_interner.intern_literal(&b.to_string())?),
512            _ => return Ok(None),
513        };
514
515        Ok(Some(Triple::new(
516            subject,
517            Predicate::NamedNode(predicate),
518            object,
519        )))
520    }
521
522    /// Expand property using JSON-LD context
523    fn expand_property(property: &str, context: &Value) -> Result<String, JsonLdParseError> {
524        // Simplified context expansion - in real implementation this would be more complex
525        if property.contains(':') {
526            Ok(property.to_string())
527        } else if let Value::Object(ctx) = context {
528            if let Some(expanded) = ctx.get(property) {
529                if let Some(iri) = expanded.as_str() {
530                    Ok(iri.to_string())
531                } else {
532                    Ok(format!("http://example.org/{property}"))
533                }
534            } else {
535                Ok(format!("http://example.org/{property}"))
536            }
537        } else {
538            Ok(format!("http://example.org/{property}"))
539        }
540    }
541
542    /// Resolve remote context (simplified)
543    async fn resolve_remote_context(_context_iri: &str) -> Result<Value, JsonLdParseError> {
544        // In real implementation, this would fetch remote contexts
545        // For now, return empty context
546        Ok(Value::Object(Map::new()))
547    }
548
549    /// Check if chunk size should be adjusted
550    fn should_adjust_chunk_size(&self, bytes_read: usize) -> bool {
551        let target_size = self.config.chunk_size;
552        let threshold = (target_size as f64 * self.config.adaptive_threshold) as usize;
553        bytes_read < threshold || bytes_read > target_size * 2
554    }
555
556    /// Adaptively adjust chunk size based on performance
557    async fn adjust_chunk_size_adaptive(&mut self) {
558        let avg_processing_time = self.performance_monitor.average_chunk_processing_time();
559        let memory_pressure = self.performance_monitor.memory_pressure_detected();
560
561        if memory_pressure {
562            self.config.chunk_size = (self.config.chunk_size / 2).max(1024);
563        } else if avg_processing_time < Duration::from_millis(10) {
564            self.config.chunk_size = (self.config.chunk_size * 2).min(1024 * 1024);
565        }
566    }
567}
568
569/// Chunk of data being processed
570#[derive(Debug)]
571struct ProcessingChunk {
572    data: Vec<u8>,
573    #[allow(dead_code)]
574    timestamp: Instant,
575    #[allow(dead_code)]
576    sequence_id: usize,
577}
578
579/// Streaming processing statistics
580#[derive(Debug, Clone)]
581pub struct StreamingStatistics {
582    pub total_bytes_processed: usize,
583    pub total_triples_parsed: usize,
584    pub processing_time: Duration,
585    pub average_throughput_mbps: f64,
586    pub parse_errors: usize,
587    pub context_cache_hit_ratio: f64,
588    pub simd_operations_count: usize,
589    pub zero_copy_operations_count: usize,
590}
591
592impl PerformanceMonitor {
593    fn new() -> Self {
594        Self {
595            total_bytes_processed: AtomicUsize::new(0),
596            total_triples_parsed: AtomicUsize::new(0),
597            parse_errors: AtomicUsize::new(0),
598            context_cache_hits: AtomicUsize::new(0),
599            context_cache_misses: AtomicUsize::new(0),
600            simd_operations: AtomicUsize::new(0),
601            zero_copy_operations: AtomicUsize::new(0),
602            start_time: Instant::now(),
603            chunk_processing_times: Arc::new(Mutex::new(VecDeque::with_capacity(1000))),
604        }
605    }
606
607    fn record_bytes_processed(&self, bytes: usize) {
608        self.total_bytes_processed
609            .fetch_add(bytes, Ordering::Relaxed);
610    }
611
612    fn record_triples_parsed(&self, count: usize) {
613        self.total_triples_parsed
614            .fetch_add(count, Ordering::Relaxed);
615    }
616
617    fn should_flush_batch(&self) -> bool {
618        // Adaptive flushing logic based on performance metrics
619        self.average_chunk_processing_time() > Duration::from_millis(100)
620    }
621
622    fn average_chunk_processing_time(&self) -> Duration {
623        let times = self.chunk_processing_times.lock();
624        if times.is_empty() {
625            return Duration::from_millis(1);
626        }
627
628        let total: Duration = times.iter().sum();
629        total / times.len() as u32
630    }
631
632    fn memory_pressure_detected(&self) -> bool {
633        // Simplified memory pressure detection
634        false // Implementation would check actual memory usage
635    }
636
637    fn get_statistics(&self) -> StreamingStatistics {
638        let elapsed = self.start_time.elapsed();
639        let bytes = self.total_bytes_processed.load(Ordering::Relaxed);
640        let triples = self.total_triples_parsed.load(Ordering::Relaxed);
641        let errors = self.parse_errors.load(Ordering::Relaxed);
642        let cache_hits = self.context_cache_hits.load(Ordering::Relaxed);
643        let cache_misses = self.context_cache_misses.load(Ordering::Relaxed);
644        let simd_ops = self.simd_operations.load(Ordering::Relaxed);
645        let zero_copy_ops = self.zero_copy_operations.load(Ordering::Relaxed);
646
647        let throughput_mbps = if elapsed.as_secs() > 0 {
648            (bytes as f64) / (1024.0 * 1024.0) / elapsed.as_secs_f64()
649        } else {
650            0.0
651        };
652
653        let cache_hit_ratio = if cache_hits + cache_misses > 0 {
654            cache_hits as f64 / (cache_hits + cache_misses) as f64
655        } else {
656            0.0
657        };
658
659        StreamingStatistics {
660            total_bytes_processed: bytes,
661            total_triples_parsed: triples,
662            processing_time: elapsed,
663            average_throughput_mbps: throughput_mbps,
664            parse_errors: errors,
665            context_cache_hit_ratio: cache_hit_ratio,
666            simd_operations_count: simd_ops,
667            zero_copy_operations_count: zero_copy_ops,
668        }
669    }
670}
671
672impl BufferPool {
673    fn new(buffer_size: usize, max_buffers: usize) -> Self {
674        Self {
675            available_buffers: Arc::new(Mutex::new(Vec::with_capacity(max_buffers))),
676            buffer_size,
677            max_buffers,
678            current_buffers: AtomicUsize::new(0),
679        }
680    }
681
682    async fn get_buffer(&self) -> ZeroCopyBuffer {
683        loop {
684            // Try to get a buffer without waiting
685            {
686                let mut buffers = self.available_buffers.lock();
687                if let Some(buffer) = buffers.pop() {
688                    return buffer;
689                }
690            } // MutexGuard dropped here
691
692            if self.current_buffers.load(Ordering::Relaxed) < self.max_buffers {
693                self.current_buffers.fetch_add(1, Ordering::Relaxed);
694                return ZeroCopyBuffer::new(self.buffer_size);
695            } else {
696                // Wait for a buffer to become available
697                tokio::time::sleep(Duration::from_millis(1)).await;
698            }
699        }
700    }
701
702    #[allow(dead_code)]
703    fn return_buffer(&self, mut buffer: ZeroCopyBuffer) {
704        buffer.reset();
705        let mut buffers = self.available_buffers.lock();
706        if buffers.len() < self.max_buffers {
707            buffers.push(buffer);
708        } else {
709            self.current_buffers.fetch_sub(1, Ordering::Relaxed);
710        }
711    }
712}
713
714/// Memory-efficient sink that accumulates triples in memory
715pub struct MemoryStreamingSink {
716    triples: Arc<RwLock<Vec<Triple>>>,
717    quads: Arc<RwLock<Vec<Quad>>>,
718    statistics: Arc<RwLock<SinkStatistics>>,
719}
720
721impl Default for MemoryStreamingSink {
722    fn default() -> Self {
723        Self::new()
724    }
725}
726
727impl MemoryStreamingSink {
728    pub fn new() -> Self {
729        Self {
730            triples: Arc::new(RwLock::new(Vec::new())),
731            quads: Arc::new(RwLock::new(Vec::new())),
732            statistics: Arc::new(RwLock::new(SinkStatistics {
733                total_triples_processed: 0,
734                total_quads_processed: 0,
735                average_batch_size: 0.0,
736                processing_rate_per_second: 0.0,
737                memory_usage_bytes: 0,
738            })),
739        }
740    }
741
742    pub fn into_triples(self) -> Arc<RwLock<Vec<Triple>>> {
743        self.triples
744    }
745
746    pub async fn get_triples(&self) -> Vec<Triple> {
747        self.triples.read().await.clone()
748    }
749
750    pub async fn get_quads(&self) -> Vec<Quad> {
751        self.quads.read().await.clone()
752    }
753}
754
755/// Error type for streaming operations
756#[derive(Debug)]
757pub struct StreamingError(Box<dyn StdError + Send + Sync>);
758
759impl std::fmt::Display for StreamingError {
760    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761        write!(f, "Streaming error: {}", self.0)
762    }
763}
764
765impl StdError for StreamingError {
766    fn source(&self) -> Option<&(dyn StdError + 'static)> {
767        Some(&*self.0)
768    }
769}
770
771impl From<Box<dyn StdError + Send + Sync>> for StreamingError {
772    fn from(err: Box<dyn StdError + Send + Sync>) -> Self {
773        StreamingError(err)
774    }
775}
776
777#[async_trait::async_trait]
778impl StreamingSink for MemoryStreamingSink {
779    type Error = StreamingError;
780
781    async fn process_triple_batch(&mut self, triples: Vec<Triple>) -> Result<(), Self::Error> {
782        let batch_size = triples.len();
783        self.triples.write().await.extend(triples);
784
785        let mut stats = self.statistics.write().await;
786        stats.total_triples_processed += batch_size;
787        stats.average_batch_size = (stats.average_batch_size + batch_size as f64) / 2.0;
788
789        Ok(())
790    }
791
792    async fn process_quad_batch(&mut self, quads: Vec<Quad>) -> Result<(), Self::Error> {
793        let batch_size = quads.len();
794        self.quads.write().await.extend(quads);
795
796        let mut stats = self.statistics.write().await;
797        stats.total_quads_processed += batch_size;
798
799        Ok(())
800    }
801
802    async fn flush(&mut self) -> Result<(), Self::Error> {
803        // Memory sink doesn't need explicit flushing
804        Ok(())
805    }
806
807    fn performance_statistics(&self) -> SinkStatistics {
808        // Would need to implement actual memory usage calculation
809        SinkStatistics {
810            total_triples_processed: 0,
811            total_quads_processed: 0,
812            average_batch_size: 0.0,
813            processing_rate_per_second: 0.0,
814            memory_usage_bytes: 0,
815        }
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use std::io::Cursor;
823
824    #[tokio::test]
825    async fn test_ultra_streaming_parser() {
826        let json_ld_data = r#"[
827            {
828                "@id": "http://example.org/person/1",
829                "name": "Alice",
830                "age": 30
831            },
832            {
833                "@id": "http://example.org/person/2", 
834                "name": "Bob",
835                "age": 25
836            }
837        ]"#;
838
839        let config = StreamingConfig::default();
840        let mut parser = UltraStreamingJsonLdParser::new(config);
841        let reader = Cursor::new(json_ld_data.as_bytes());
842        let sink = MemoryStreamingSink::new();
843
844        // Clone the Arc so we can access the data after parsing
845        let _sink_data = Arc::clone(&sink.triples);
846
847        let stats = parser
848            .stream_parse(reader, sink)
849            .await
850            .expect("async operation should succeed");
851
852        assert!(stats.total_bytes_processed > 0);
853        // Note: We're not actually parsing triples correctly in the test data yet
854        // The JSON-LD processing needs more work to extract triples
855        // assert!(stats.total_triples_parsed > 0);
856    }
857}