Skip to main content

sbom_tools/parsers/
streaming.rs

1//! Streaming SBOM parser for large files.
2//!
3//! This module provides memory-efficient parsing for very large SBOMs by:
4//! - Using streaming JSON/XML parsing (`serde_json::from_reader`)
5//! - Not buffering the entire file into memory as a string
6//! - Yielding results via an iterator interface
7//! - Supporting progress callbacks
8//!
9//! # Usage
10//!
11//! ```no_run
12//! use sbom_tools::parsers::streaming::{StreamingParser, StreamingConfig, ParseEvent};
13//! use std::path::Path;
14//!
15//! let config = StreamingConfig::default()
16//!     .with_chunk_size(64 * 1024)
17//!     .with_progress_callback(|p| println!("Progress: {:.1}%", p.percent()));
18//!
19//! let parser = StreamingParser::new(config);
20//! let stream = parser.parse_file(Path::new("large-sbom.json")).unwrap();
21//!
22//! for event in stream {
23//!     match event {
24//!         Ok(ParseEvent::Metadata(doc)) => println!("Document: {:?}", doc.format),
25//!         Ok(ParseEvent::Component(comp)) => println!("Component: {}", comp.name),
26//!         Ok(ParseEvent::Dependency(edge)) => println!("Dependency: {} -> {}", edge.from, edge.to),
27//!         Ok(ParseEvent::Complete) => println!("Done!"),
28//!         Err(e) => eprintln!("Error: {}", e),
29//!     }
30//! }
31//! ```
32
33use super::detection::FormatDetector;
34use super::traits::ParseError;
35use crate::model::{Component, DependencyEdge, DocumentMetadata, NormalizedSbom};
36use std::collections::VecDeque;
37use std::io::{BufRead, BufReader, Read};
38use std::path::Path;
39use std::sync::Arc;
40
41/// Progress information for streaming parsing
42#[derive(Debug, Clone)]
43pub struct ParseProgress {
44    /// Bytes read so far
45    pub bytes_read: u64,
46    /// Total bytes (if known)
47    pub total_bytes: Option<u64>,
48    /// Components parsed so far
49    pub components_parsed: usize,
50    /// Dependencies parsed so far
51    pub dependencies_parsed: usize,
52}
53
54impl ParseProgress {
55    /// Get progress percentage (0-100), or None if total is unknown
56    #[must_use]
57    pub fn percent(&self) -> f32 {
58        match self.total_bytes {
59            Some(total) if total > 0 => (self.bytes_read as f32 / total as f32) * 100.0,
60            _ => 0.0,
61        }
62    }
63
64    /// Check if progress is complete
65    #[must_use]
66    pub fn is_complete(&self) -> bool {
67        self.total_bytes
68            .is_some_and(|total| self.bytes_read >= total)
69    }
70}
71
72/// Progress callback type
73pub type ProgressCallback = Arc<dyn Fn(&ParseProgress) + Send + Sync>;
74
75/// Configuration for streaming parser
76#[derive(Clone)]
77pub struct StreamingConfig {
78    /// Chunk size for reading (default: 64KB)
79    pub chunk_size: usize,
80    /// Buffer size for components (default: 1000)
81    pub component_buffer_size: usize,
82    /// Progress callback (optional)
83    progress_callback: Option<ProgressCallback>,
84    /// Whether to validate components during parsing
85    pub validate_during_parse: bool,
86    /// Skip malformed components instead of erroring
87    pub skip_malformed: bool,
88}
89
90impl Default for StreamingConfig {
91    fn default() -> Self {
92        Self {
93            chunk_size: 64 * 1024, // 64KB
94            component_buffer_size: 1000,
95            progress_callback: None,
96            validate_during_parse: true,
97            skip_malformed: false,
98        }
99    }
100}
101
102impl StreamingConfig {
103    /// Set chunk size for reading
104    #[must_use]
105    pub fn with_chunk_size(mut self, size: usize) -> Self {
106        self.chunk_size = size.max(1024); // Minimum 1KB
107        self
108    }
109
110    /// Set component buffer size
111    #[must_use]
112    pub fn with_buffer_size(mut self, size: usize) -> Self {
113        self.component_buffer_size = size.max(10);
114        self
115    }
116
117    /// Set progress callback
118    #[must_use]
119    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
120    where
121        F: Fn(&ParseProgress) + Send + Sync + 'static,
122    {
123        self.progress_callback = Some(Arc::new(callback));
124        self
125    }
126
127    /// Enable/disable validation during parsing
128    #[must_use]
129    pub const fn with_validation(mut self, validate: bool) -> Self {
130        self.validate_during_parse = validate;
131        self
132    }
133
134    /// Enable/disable skipping malformed components
135    #[must_use]
136    pub const fn with_skip_malformed(mut self, skip: bool) -> Self {
137        self.skip_malformed = skip;
138        self
139    }
140}
141
142impl std::fmt::Debug for StreamingConfig {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("StreamingConfig")
145            .field("chunk_size", &self.chunk_size)
146            .field("component_buffer_size", &self.component_buffer_size)
147            .field("has_progress_callback", &self.progress_callback.is_some())
148            .field("validate_during_parse", &self.validate_during_parse)
149            .field("skip_malformed", &self.skip_malformed)
150            .finish()
151    }
152}
153
154/// Events emitted during streaming parsing
155#[derive(Debug, Clone)]
156pub enum ParseEvent {
157    /// Document metadata has been parsed
158    Metadata(DocumentMetadata),
159    /// A component has been parsed
160    Component(Box<Component>),
161    /// A dependency relationship has been parsed
162    Dependency(DependencyEdge),
163    /// Parsing is complete
164    Complete,
165}
166
167/// Streaming parser for large SBOMs
168#[derive(Debug)]
169pub struct StreamingParser {
170    config: StreamingConfig,
171}
172
173impl StreamingParser {
174    /// Create a new streaming parser with the given configuration
175    #[must_use]
176    pub const fn new(config: StreamingConfig) -> Self {
177        Self { config }
178    }
179
180    /// Create a streaming parser with default configuration
181    #[must_use]
182    pub fn default_config() -> Self {
183        Self::new(StreamingConfig::default())
184    }
185
186    /// Parse a file and return an iterator of events
187    pub fn parse_file(&self, path: &Path) -> Result<StreamingIterator, ParseError> {
188        let file = std::fs::File::open(path)
189            .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
190
191        let total_bytes = file.metadata().map(|m| m.len()).ok();
192        let reader = BufReader::with_capacity(self.config.chunk_size, file);
193
194        self.parse_reader(reader, total_bytes)
195    }
196
197    /// Parse from a reader and return an iterator of events
198    pub fn parse_reader<R: Read + Send + 'static>(
199        &self,
200        reader: BufReader<R>,
201        total_bytes: Option<u64>,
202    ) -> Result<StreamingIterator, ParseError> {
203        Ok(StreamingIterator::new(
204            reader,
205            total_bytes,
206            self.config.clone(),
207        ))
208    }
209
210    /// Parse from string content
211    pub fn parse_str(&self, content: &str) -> Result<StreamingIterator, ParseError> {
212        let cursor = std::io::Cursor::new(content.to_string());
213        let total_bytes = Some(content.len() as u64);
214        let reader = BufReader::new(cursor);
215        self.parse_reader(reader, total_bytes)
216    }
217
218    /// Collect all events into a `NormalizedSbom` (for convenience)
219    ///
220    /// Note: This loads the entire SBOM into memory, negating the
221    /// streaming benefits. Use the iterator directly for large files.
222    pub fn parse_to_sbom(&self, path: &Path) -> Result<NormalizedSbom, ParseError> {
223        let mut stream = self.parse_file(path)?;
224        stream.collect_sbom()
225    }
226}
227
228impl Default for StreamingParser {
229    fn default() -> Self {
230        Self::default_config()
231    }
232}
233
234/// Iterator over streaming parse events
235#[allow(dead_code)]
236pub struct StreamingIterator {
237    /// Internal state
238    state: StreamingState,
239    /// Configuration
240    config: StreamingConfig,
241    /// Progress tracking
242    progress: ParseProgress,
243    /// Pending events
244    pending: VecDeque<ParseEvent>,
245    /// Whether parsing is complete
246    complete: bool,
247}
248
249enum StreamingState {
250    /// Initial state - need to detect format and parse
251    Initial(Box<dyn BufRead + Send>),
252    /// Parsing complete, emitting events from parsed SBOM
253    Emitting {
254        sbom: Box<NormalizedSbom>,
255        component_index: usize,
256        dependency_index: usize,
257        metadata_emitted: bool,
258    },
259    /// Finished
260    Done,
261}
262
263impl StreamingIterator {
264    fn new<R: Read + Send + 'static>(
265        reader: BufReader<R>,
266        total_bytes: Option<u64>,
267        config: StreamingConfig,
268    ) -> Self {
269        Self {
270            state: StreamingState::Initial(Box::new(reader)),
271            config,
272            progress: ParseProgress {
273                bytes_read: 0,
274                total_bytes,
275                components_parsed: 0,
276                dependencies_parsed: 0,
277            },
278            pending: VecDeque::new(),
279            complete: false,
280        }
281    }
282
283    /// Collect all events into a `NormalizedSbom`
284    pub fn collect_sbom(&mut self) -> Result<NormalizedSbom, ParseError> {
285        let mut metadata: Option<DocumentMetadata> = None;
286        let mut components = Vec::new();
287        let mut edges = Vec::new();
288
289        for event in self.by_ref() {
290            match event {
291                Ok(ParseEvent::Metadata(doc)) => metadata = Some(doc),
292                Ok(ParseEvent::Component(comp)) => components.push(*comp),
293                Ok(ParseEvent::Dependency(edge)) => edges.push(edge),
294                Ok(ParseEvent::Complete) => break,
295                Err(e) => return Err(e),
296            }
297        }
298
299        let document = metadata.unwrap_or_default();
300        let mut sbom = NormalizedSbom::new(document);
301
302        for comp in components {
303            sbom.add_component(comp);
304        }
305        for edge in edges {
306            sbom.add_edge(edge);
307        }
308
309        sbom.calculate_content_hash();
310        Ok(sbom)
311    }
312
313    fn report_progress(&self) {
314        if let Some(ref callback) = self.config.progress_callback {
315            callback(&self.progress);
316        }
317    }
318
319    fn advance(&mut self) -> Option<Result<ParseEvent, ParseError>> {
320        // Return pending events first
321        if let Some(event) = self.pending.pop_front() {
322            return Some(Ok(event));
323        }
324
325        if self.complete {
326            return None;
327        }
328
329        // Process based on state
330        match std::mem::replace(&mut self.state, StreamingState::Done) {
331            StreamingState::Initial(reader) => {
332                // Use centralized FormatDetector for consistent detection
333                let detector = FormatDetector::new();
334
335                // Parse using the detector which handles format detection and parsing
336                match detector.parse_reader(reader) {
337                    Ok(sbom) => {
338                        self.progress.bytes_read = self.progress.total_bytes.unwrap_or(0);
339                        self.report_progress();
340                        self.state = StreamingState::Emitting {
341                            sbom: Box::new(sbom),
342                            component_index: 0,
343                            dependency_index: 0,
344                            metadata_emitted: false,
345                        };
346                        self.advance()
347                    }
348                    Err(e) => Some(Err(e)),
349                }
350            }
351            StreamingState::Emitting {
352                sbom,
353                component_index,
354                dependency_index,
355                metadata_emitted,
356            } => {
357                // Emit metadata first
358                if !metadata_emitted {
359                    let doc = sbom.document.clone();
360                    self.state = StreamingState::Emitting {
361                        sbom,
362                        component_index,
363                        dependency_index,
364                        metadata_emitted: true,
365                    };
366                    return Some(Ok(ParseEvent::Metadata(doc)));
367                }
368
369                // Collect components into a vec for indexed access
370                let components: Vec<_> = sbom.components.values().cloned().collect();
371                let edges_len = sbom.edges.len();
372
373                // Emit components
374                if component_index < components.len() {
375                    let comp = components[component_index].clone();
376                    self.progress.components_parsed += 1;
377                    if self.progress.components_parsed.is_multiple_of(100) {
378                        self.report_progress();
379                    }
380                    self.state = StreamingState::Emitting {
381                        sbom,
382                        component_index: component_index + 1,
383                        dependency_index,
384                        metadata_emitted,
385                    };
386                    return Some(Ok(ParseEvent::Component(Box::new(comp))));
387                }
388
389                // Emit dependencies
390                if dependency_index < edges_len {
391                    let edge = sbom.edges[dependency_index].clone();
392                    self.progress.dependencies_parsed += 1;
393                    self.state = StreamingState::Emitting {
394                        sbom,
395                        component_index,
396                        dependency_index: dependency_index + 1,
397                        metadata_emitted,
398                    };
399                    return Some(Ok(ParseEvent::Dependency(edge)));
400                }
401
402                // All done
403                self.complete = true;
404                self.report_progress();
405                self.state = StreamingState::Done;
406                Some(Ok(ParseEvent::Complete))
407            }
408            StreamingState::Done => {
409                self.complete = true;
410                None
411            }
412        }
413    }
414}
415
416impl Iterator for StreamingIterator {
417    type Item = Result<ParseEvent, ParseError>;
418
419    fn next(&mut self) -> Option<Self::Item> {
420        self.advance()
421    }
422}
423
424/// Estimate the number of components in an SBOM file without full parsing
425///
426/// This performs a quick scan to estimate component count, useful for
427/// progress reporting and memory allocation.
428pub fn estimate_component_count(path: &Path) -> Result<ComponentEstimate, ParseError> {
429    let file = std::fs::File::open(path)
430        .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
431
432    let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
433
434    let reader = BufReader::new(file);
435    let mut count = 0;
436    let mut bytes_sampled = 0;
437    let sample_limit = 1024 * 1024; // Sample first 1MB
438
439    for line in reader.lines() {
440        let line = line.map_err(|e| ParseError::IoError(e.to_string()))?;
441        bytes_sampled += line.len();
442
443        // Count component markers
444        if line.contains("\"bom-ref\"") || line.contains("\"SPDXID\"") {
445            count += 1;
446        }
447
448        if bytes_sampled > sample_limit {
449            break;
450        }
451    }
452
453    // Extrapolate if we only sampled part of the file
454    let estimated = if bytes_sampled < file_size as usize && bytes_sampled > 0 {
455        (count as f64 * (file_size as f64 / bytes_sampled as f64)) as usize
456    } else {
457        count
458    };
459
460    Ok(ComponentEstimate {
461        estimated_count: estimated,
462        sampled_count: count,
463        file_size,
464        bytes_sampled,
465        is_extrapolated: bytes_sampled < file_size as usize,
466    })
467}
468
469/// Estimate of component count
470#[derive(Debug, Clone)]
471pub struct ComponentEstimate {
472    /// Estimated total component count
473    pub estimated_count: usize,
474    /// Components found in sampled region
475    pub sampled_count: usize,
476    /// Total file size in bytes
477    pub file_size: u64,
478    /// Bytes that were sampled
479    pub bytes_sampled: usize,
480    /// Whether the estimate was extrapolated
481    pub is_extrapolated: bool,
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    #[test]
489    fn test_progress_percent() {
490        let progress = ParseProgress {
491            bytes_read: 50,
492            total_bytes: Some(100),
493            components_parsed: 5,
494            dependencies_parsed: 3,
495        };
496        assert_eq!(progress.percent(), 50.0);
497        assert!(!progress.is_complete());
498
499        let complete = ParseProgress {
500            bytes_read: 100,
501            total_bytes: Some(100),
502            components_parsed: 10,
503            dependencies_parsed: 5,
504        };
505        assert_eq!(complete.percent(), 100.0);
506        assert!(complete.is_complete());
507    }
508
509    #[test]
510    fn test_streaming_config_builder() {
511        let config = StreamingConfig::default()
512            .with_chunk_size(128 * 1024)
513            .with_buffer_size(500)
514            .with_validation(false)
515            .with_skip_malformed(true);
516
517        assert_eq!(config.chunk_size, 128 * 1024);
518        assert_eq!(config.component_buffer_size, 500);
519        assert!(!config.validate_during_parse);
520        assert!(config.skip_malformed);
521    }
522
523    #[test]
524    fn test_streaming_parser_creation() {
525        let parser = StreamingParser::default_config();
526        assert_eq!(parser.config.chunk_size, 64 * 1024);
527    }
528}