Skip to main content

sbom_tools/parsers/
streaming.rs

1//! Streaming SBOM parser for large files.
2//!
3//! This module provides memory-efficient parsing for very large SBOMs by:
4//! - Using streaming JSON/XML parsing (`serde_json::from_reader`)
5//! - Not buffering the entire file into memory as a string
6//! - Yielding results via an iterator interface
7//! - Supporting progress callbacks
8//!
9//! Supports CycloneDX 1.4–1.7 (JSON/XML), SPDX 2.2–2.3 (JSON/tag-value/RDF),
10//! and SPDX 3.0 (JSON-LD). Note: SPDX 3.0 requires full document loading due
11//! to its graph-based element model with cross-references.
12//!
13//! # Usage
14//!
15//! ```no_run
16//! use sbom_tools::parsers::streaming::{StreamingParser, StreamingConfig, ParseEvent};
17//! use std::path::Path;
18//!
19//! let config = StreamingConfig::default()
20//!     .with_chunk_size(64 * 1024)
21//!     .with_progress_callback(|p| println!("Progress: {:.1}%", p.percent()));
22//!
23//! let parser = StreamingParser::new(config);
24//! let stream = parser.parse_file(Path::new("large-sbom.json")).unwrap();
25//!
26//! for event in stream {
27//!     match event {
28//!         Ok(ParseEvent::Metadata(doc)) => println!("Document: {:?}", doc.format),
29//!         Ok(ParseEvent::Component(comp)) => println!("Component: {}", comp.name),
30//!         Ok(ParseEvent::Dependency(edge)) => println!("Dependency: {} -> {}", edge.from, edge.to),
31//!         Ok(ParseEvent::Complete) => println!("Done!"),
32//!         Err(e) => eprintln!("Error: {}", e),
33//!     }
34//! }
35//! ```
36
37use super::detection::FormatDetector;
38use super::traits::ParseError;
39use crate::model::{Component, DependencyEdge, DocumentMetadata, NormalizedSbom};
40use std::collections::VecDeque;
41use std::io::{BufRead, BufReader, Read};
42use std::path::Path;
43use std::sync::Arc;
44
45/// Progress information for streaming parsing
46#[derive(Debug, Clone)]
47pub struct ParseProgress {
48    /// Bytes read so far
49    pub bytes_read: u64,
50    /// Total bytes (if known)
51    pub total_bytes: Option<u64>,
52    /// Components parsed so far
53    pub components_parsed: usize,
54    /// Dependencies parsed so far
55    pub dependencies_parsed: usize,
56}
57
58impl ParseProgress {
59    /// Get progress percentage (0-100), or None if total is unknown
60    #[must_use]
61    pub fn percent(&self) -> f32 {
62        match self.total_bytes {
63            Some(total) if total > 0 => (self.bytes_read as f32 / total as f32) * 100.0,
64            _ => 0.0,
65        }
66    }
67
68    /// Check if progress is complete
69    #[must_use]
70    pub fn is_complete(&self) -> bool {
71        self.total_bytes
72            .is_some_and(|total| self.bytes_read >= total)
73    }
74}
75
76/// Progress callback type
77pub type ProgressCallback = Arc<dyn Fn(&ParseProgress) + Send + Sync>;
78
79/// Configuration for streaming parser
80#[derive(Clone)]
81pub struct StreamingConfig {
82    /// Chunk size for reading (default: 64KB)
83    pub chunk_size: usize,
84    /// Buffer size for components (default: 1000)
85    pub component_buffer_size: usize,
86    /// Progress callback (optional)
87    progress_callback: Option<ProgressCallback>,
88    /// Whether to validate components during parsing
89    pub validate_during_parse: bool,
90    /// Skip malformed components instead of erroring
91    pub skip_malformed: bool,
92}
93
94impl Default for StreamingConfig {
95    fn default() -> Self {
96        Self {
97            chunk_size: 64 * 1024, // 64KB
98            component_buffer_size: 1000,
99            progress_callback: None,
100            validate_during_parse: true,
101            skip_malformed: false,
102        }
103    }
104}
105
106impl StreamingConfig {
107    /// Set chunk size for reading
108    #[must_use]
109    pub fn with_chunk_size(mut self, size: usize) -> Self {
110        self.chunk_size = size.max(1024); // Minimum 1KB
111        self
112    }
113
114    /// Set component buffer size
115    #[must_use]
116    pub fn with_buffer_size(mut self, size: usize) -> Self {
117        self.component_buffer_size = size.max(10);
118        self
119    }
120
121    /// Set progress callback
122    #[must_use]
123    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
124    where
125        F: Fn(&ParseProgress) + Send + Sync + 'static,
126    {
127        self.progress_callback = Some(Arc::new(callback));
128        self
129    }
130
131    /// Enable/disable validation during parsing
132    #[must_use]
133    pub const fn with_validation(mut self, validate: bool) -> Self {
134        self.validate_during_parse = validate;
135        self
136    }
137
138    /// Enable/disable skipping malformed components
139    #[must_use]
140    pub const fn with_skip_malformed(mut self, skip: bool) -> Self {
141        self.skip_malformed = skip;
142        self
143    }
144}
145
146impl std::fmt::Debug for StreamingConfig {
147    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        f.debug_struct("StreamingConfig")
149            .field("chunk_size", &self.chunk_size)
150            .field("component_buffer_size", &self.component_buffer_size)
151            .field("has_progress_callback", &self.progress_callback.is_some())
152            .field("validate_during_parse", &self.validate_during_parse)
153            .field("skip_malformed", &self.skip_malformed)
154            .finish()
155    }
156}
157
158/// Events emitted during streaming parsing
159#[derive(Debug, Clone)]
160pub enum ParseEvent {
161    /// Document metadata has been parsed
162    Metadata(DocumentMetadata),
163    /// A component has been parsed
164    Component(Box<Component>),
165    /// A dependency relationship has been parsed
166    Dependency(DependencyEdge),
167    /// Parsing is complete
168    Complete,
169}
170
171/// Streaming parser for large SBOMs
172#[derive(Debug)]
173pub struct StreamingParser {
174    config: StreamingConfig,
175}
176
177impl StreamingParser {
178    /// Create a new streaming parser with the given configuration
179    #[must_use]
180    pub const fn new(config: StreamingConfig) -> Self {
181        Self { config }
182    }
183
184    /// Create a streaming parser with default configuration
185    #[must_use]
186    pub fn default_config() -> Self {
187        Self::new(StreamingConfig::default())
188    }
189
190    /// Parse a file and return an iterator of events
191    pub fn parse_file(&self, path: &Path) -> Result<StreamingIterator, ParseError> {
192        let file = std::fs::File::open(path)
193            .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
194
195        let total_bytes = file.metadata().map(|m| m.len()).ok();
196        let reader = BufReader::with_capacity(self.config.chunk_size, file);
197
198        self.parse_reader(reader, total_bytes)
199    }
200
201    /// Parse from a reader and return an iterator of events
202    pub fn parse_reader<R: Read + Send + 'static>(
203        &self,
204        reader: BufReader<R>,
205        total_bytes: Option<u64>,
206    ) -> Result<StreamingIterator, ParseError> {
207        Ok(StreamingIterator::new(
208            reader,
209            total_bytes,
210            self.config.clone(),
211        ))
212    }
213
214    /// Parse from string content
215    pub fn parse_str(&self, content: &str) -> Result<StreamingIterator, ParseError> {
216        let cursor = std::io::Cursor::new(content.to_string());
217        let total_bytes = Some(content.len() as u64);
218        let reader = BufReader::new(cursor);
219        self.parse_reader(reader, total_bytes)
220    }
221
222    /// Collect all events into a `NormalizedSbom` (for convenience)
223    ///
224    /// Note: This loads the entire SBOM into memory, negating the
225    /// streaming benefits. Use the iterator directly for large files.
226    pub fn parse_to_sbom(&self, path: &Path) -> Result<NormalizedSbom, ParseError> {
227        let mut stream = self.parse_file(path)?;
228        stream.collect_sbom()
229    }
230}
231
232impl Default for StreamingParser {
233    fn default() -> Self {
234        Self::default_config()
235    }
236}
237
238/// Iterator over streaming parse events
239#[allow(dead_code)]
240pub struct StreamingIterator {
241    /// Internal state
242    state: StreamingState,
243    /// Configuration
244    config: StreamingConfig,
245    /// Progress tracking
246    progress: ParseProgress,
247    /// Pending events
248    pending: VecDeque<ParseEvent>,
249    /// Whether parsing is complete
250    complete: bool,
251}
252
253enum StreamingState {
254    /// Initial state - need to detect format and parse
255    Initial(Box<dyn BufRead + Send>),
256    /// Parsing complete, emitting events from parsed SBOM
257    Emitting {
258        sbom: Box<NormalizedSbom>,
259        component_index: usize,
260        dependency_index: usize,
261        metadata_emitted: bool,
262    },
263    /// Finished
264    Done,
265}
266
267impl StreamingIterator {
268    fn new<R: Read + Send + 'static>(
269        reader: BufReader<R>,
270        total_bytes: Option<u64>,
271        config: StreamingConfig,
272    ) -> Self {
273        Self {
274            state: StreamingState::Initial(Box::new(reader)),
275            config,
276            progress: ParseProgress {
277                bytes_read: 0,
278                total_bytes,
279                components_parsed: 0,
280                dependencies_parsed: 0,
281            },
282            pending: VecDeque::new(),
283            complete: false,
284        }
285    }
286
287    /// Collect all events into a `NormalizedSbom`
288    pub fn collect_sbom(&mut self) -> Result<NormalizedSbom, ParseError> {
289        let mut metadata: Option<DocumentMetadata> = None;
290        let mut components = Vec::new();
291        let mut edges = Vec::new();
292
293        for event in self.by_ref() {
294            match event {
295                Ok(ParseEvent::Metadata(doc)) => metadata = Some(doc),
296                Ok(ParseEvent::Component(comp)) => components.push(*comp),
297                Ok(ParseEvent::Dependency(edge)) => edges.push(edge),
298                Ok(ParseEvent::Complete) => break,
299                Err(e) => return Err(e),
300            }
301        }
302
303        let document = metadata.unwrap_or_default();
304        let mut sbom = NormalizedSbom::new(document);
305
306        for comp in components {
307            sbom.add_component(comp);
308        }
309        for edge in edges {
310            sbom.add_edge(edge);
311        }
312
313        sbom.calculate_content_hash();
314        Ok(sbom)
315    }
316
317    fn report_progress(&self) {
318        if let Some(ref callback) = self.config.progress_callback {
319            callback(&self.progress);
320        }
321    }
322
323    fn advance(&mut self) -> Option<Result<ParseEvent, ParseError>> {
324        // Return pending events first
325        if let Some(event) = self.pending.pop_front() {
326            return Some(Ok(event));
327        }
328
329        if self.complete {
330            return None;
331        }
332
333        // Process based on state
334        match std::mem::replace(&mut self.state, StreamingState::Done) {
335            StreamingState::Initial(reader) => {
336                // Use centralized FormatDetector for consistent detection
337                let detector = FormatDetector::new();
338
339                // Parse using the detector which handles format detection and parsing
340                match detector.parse_reader(reader) {
341                    Ok(sbom) => {
342                        self.progress.bytes_read = self.progress.total_bytes.unwrap_or(0);
343                        self.report_progress();
344                        self.state = StreamingState::Emitting {
345                            sbom: Box::new(sbom),
346                            component_index: 0,
347                            dependency_index: 0,
348                            metadata_emitted: false,
349                        };
350                        self.advance()
351                    }
352                    Err(e) => Some(Err(e)),
353                }
354            }
355            StreamingState::Emitting {
356                sbom,
357                component_index,
358                dependency_index,
359                metadata_emitted,
360            } => {
361                // Emit metadata first
362                if !metadata_emitted {
363                    let doc = sbom.document.clone();
364                    self.state = StreamingState::Emitting {
365                        sbom,
366                        component_index,
367                        dependency_index,
368                        metadata_emitted: true,
369                    };
370                    return Some(Ok(ParseEvent::Metadata(doc)));
371                }
372
373                // Collect components into a vec for indexed access
374                let components: Vec<_> = sbom.components.values().cloned().collect();
375                let edges_len = sbom.edges.len();
376
377                // Emit components
378                if component_index < components.len() {
379                    let comp = components[component_index].clone();
380                    self.progress.components_parsed += 1;
381                    if self.progress.components_parsed.is_multiple_of(100) {
382                        self.report_progress();
383                    }
384                    self.state = StreamingState::Emitting {
385                        sbom,
386                        component_index: component_index + 1,
387                        dependency_index,
388                        metadata_emitted,
389                    };
390                    return Some(Ok(ParseEvent::Component(Box::new(comp))));
391                }
392
393                // Emit dependencies
394                if dependency_index < edges_len {
395                    let edge = sbom.edges[dependency_index].clone();
396                    self.progress.dependencies_parsed += 1;
397                    self.state = StreamingState::Emitting {
398                        sbom,
399                        component_index,
400                        dependency_index: dependency_index + 1,
401                        metadata_emitted,
402                    };
403                    return Some(Ok(ParseEvent::Dependency(edge)));
404                }
405
406                // All done
407                self.complete = true;
408                self.report_progress();
409                self.state = StreamingState::Done;
410                Some(Ok(ParseEvent::Complete))
411            }
412            StreamingState::Done => {
413                self.complete = true;
414                None
415            }
416        }
417    }
418}
419
420impl Iterator for StreamingIterator {
421    type Item = Result<ParseEvent, ParseError>;
422
423    fn next(&mut self) -> Option<Self::Item> {
424        self.advance()
425    }
426}
427
428/// Estimate the number of components in an SBOM file without full parsing
429///
430/// This performs a quick scan to estimate component count, useful for
431/// progress reporting and memory allocation.
432pub fn estimate_component_count(path: &Path) -> Result<ComponentEstimate, ParseError> {
433    let file = std::fs::File::open(path)
434        .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
435
436    let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
437
438    let reader = BufReader::new(file);
439    let mut count = 0;
440    let mut bytes_sampled = 0;
441    let sample_limit = 1024 * 1024; // Sample first 1MB
442
443    for line in reader.lines() {
444        let line = line.map_err(|e| ParseError::IoError(e.to_string()))?;
445        bytes_sampled += line.len();
446
447        // Count component markers
448        if line.contains("\"bom-ref\"") || line.contains("\"SPDXID\"") {
449            count += 1;
450        }
451
452        if bytes_sampled > sample_limit {
453            break;
454        }
455    }
456
457    // Extrapolate if we only sampled part of the file
458    let estimated = if bytes_sampled < file_size as usize && bytes_sampled > 0 {
459        (count as f64 * (file_size as f64 / bytes_sampled as f64)) as usize
460    } else {
461        count
462    };
463
464    Ok(ComponentEstimate {
465        estimated_count: estimated,
466        sampled_count: count,
467        file_size,
468        bytes_sampled,
469        is_extrapolated: bytes_sampled < file_size as usize,
470    })
471}
472
473/// Estimate of component count
474#[derive(Debug, Clone)]
475pub struct ComponentEstimate {
476    /// Estimated total component count
477    pub estimated_count: usize,
478    /// Components found in sampled region
479    pub sampled_count: usize,
480    /// Total file size in bytes
481    pub file_size: u64,
482    /// Bytes that were sampled
483    pub bytes_sampled: usize,
484    /// Whether the estimate was extrapolated
485    pub is_extrapolated: bool,
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491
492    #[test]
493    fn test_progress_percent() {
494        let progress = ParseProgress {
495            bytes_read: 50,
496            total_bytes: Some(100),
497            components_parsed: 5,
498            dependencies_parsed: 3,
499        };
500        assert_eq!(progress.percent(), 50.0);
501        assert!(!progress.is_complete());
502
503        let complete = ParseProgress {
504            bytes_read: 100,
505            total_bytes: Some(100),
506            components_parsed: 10,
507            dependencies_parsed: 5,
508        };
509        assert_eq!(complete.percent(), 100.0);
510        assert!(complete.is_complete());
511    }
512
513    #[test]
514    fn test_streaming_config_builder() {
515        let config = StreamingConfig::default()
516            .with_chunk_size(128 * 1024)
517            .with_buffer_size(500)
518            .with_validation(false)
519            .with_skip_malformed(true);
520
521        assert_eq!(config.chunk_size, 128 * 1024);
522        assert_eq!(config.component_buffer_size, 500);
523        assert!(!config.validate_during_parse);
524        assert!(config.skip_malformed);
525    }
526
527    #[test]
528    fn test_streaming_parser_creation() {
529        let parser = StreamingParser::default_config();
530        assert_eq!(parser.config.chunk_size, 64 * 1024);
531    }
532}