Skip to main content

sbom_tools/parsers/
streaming.rs

1//! Streaming SBOM parser for large files.
2//!
3//! This module provides memory-efficient parsing for very large SBOMs by:
4//! - Using streaming JSON/XML parsing (serde_json::from_reader)
5//! - Not buffering the entire file into memory as a string
6//! - Yielding results via an iterator interface
7//! - Supporting progress callbacks
8//!
9//! # Usage
10//!
11//! ```no_run
12//! use sbom_tools::parsers::streaming::{StreamingParser, StreamingConfig, ParseEvent};
13//! use std::path::Path;
14//!
15//! let config = StreamingConfig::default()
16//!     .with_chunk_size(64 * 1024)
17//!     .with_progress_callback(|p| println!("Progress: {:.1}%", p.percent()));
18//!
19//! let parser = StreamingParser::new(config);
20//! let stream = parser.parse_file(Path::new("large-sbom.json")).unwrap();
21//!
22//! for event in stream {
23//!     match event {
24//!         Ok(ParseEvent::Metadata(doc)) => println!("Document: {:?}", doc.format),
25//!         Ok(ParseEvent::Component(comp)) => println!("Component: {}", comp.name),
26//!         Ok(ParseEvent::Dependency(edge)) => println!("Dependency: {} -> {}", edge.from, edge.to),
27//!         Ok(ParseEvent::Complete) => println!("Done!"),
28//!         Err(e) => eprintln!("Error: {}", e),
29//!     }
30//! }
31//! ```
32
33use super::detection::FormatDetector;
34use super::traits::ParseError;
35use crate::model::{Component, DependencyEdge, DocumentMetadata, NormalizedSbom};
36use std::collections::VecDeque;
37use std::io::{BufRead, BufReader, Read};
38use std::path::Path;
39use std::sync::Arc;
40
41/// Progress information for streaming parsing
42#[derive(Debug, Clone)]
43pub struct ParseProgress {
44    /// Bytes read so far
45    pub bytes_read: u64,
46    /// Total bytes (if known)
47    pub total_bytes: Option<u64>,
48    /// Components parsed so far
49    pub components_parsed: usize,
50    /// Dependencies parsed so far
51    pub dependencies_parsed: usize,
52}
53
54impl ParseProgress {
55    /// Get progress percentage (0-100), or None if total is unknown
56    pub fn percent(&self) -> f32 {
57        match self.total_bytes {
58            Some(total) if total > 0 => (self.bytes_read as f32 / total as f32) * 100.0,
59            _ => 0.0,
60        }
61    }
62
63    /// Check if progress is complete
64    pub fn is_complete(&self) -> bool {
65        match self.total_bytes {
66            Some(total) => self.bytes_read >= total,
67            None => false,
68        }
69    }
70}
71
72/// Progress callback type
73pub type ProgressCallback = Arc<dyn Fn(&ParseProgress) + Send + Sync>;
74
75/// Configuration for streaming parser
76#[derive(Clone)]
77pub struct StreamingConfig {
78    /// Chunk size for reading (default: 64KB)
79    pub chunk_size: usize,
80    /// Buffer size for components (default: 1000)
81    pub component_buffer_size: usize,
82    /// Progress callback (optional)
83    progress_callback: Option<ProgressCallback>,
84    /// Whether to validate components during parsing
85    pub validate_during_parse: bool,
86    /// Skip malformed components instead of erroring
87    pub skip_malformed: bool,
88}
89
90impl Default for StreamingConfig {
91    fn default() -> Self {
92        Self {
93            chunk_size: 64 * 1024, // 64KB
94            component_buffer_size: 1000,
95            progress_callback: None,
96            validate_during_parse: true,
97            skip_malformed: false,
98        }
99    }
100}
101
102impl StreamingConfig {
103    /// Set chunk size for reading
104    pub fn with_chunk_size(mut self, size: usize) -> Self {
105        self.chunk_size = size.max(1024); // Minimum 1KB
106        self
107    }
108
109    /// Set component buffer size
110    pub fn with_buffer_size(mut self, size: usize) -> Self {
111        self.component_buffer_size = size.max(10);
112        self
113    }
114
115    /// Set progress callback
116    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
117    where
118        F: Fn(&ParseProgress) + Send + Sync + 'static,
119    {
120        self.progress_callback = Some(Arc::new(callback));
121        self
122    }
123
124    /// Enable/disable validation during parsing
125    pub fn with_validation(mut self, validate: bool) -> Self {
126        self.validate_during_parse = validate;
127        self
128    }
129
130    /// Enable/disable skipping malformed components
131    pub fn with_skip_malformed(mut self, skip: bool) -> Self {
132        self.skip_malformed = skip;
133        self
134    }
135}
136
137impl std::fmt::Debug for StreamingConfig {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("StreamingConfig")
140            .field("chunk_size", &self.chunk_size)
141            .field("component_buffer_size", &self.component_buffer_size)
142            .field("has_progress_callback", &self.progress_callback.is_some())
143            .field("validate_during_parse", &self.validate_during_parse)
144            .field("skip_malformed", &self.skip_malformed)
145            .finish()
146    }
147}
148
149/// Events emitted during streaming parsing
150#[derive(Debug, Clone)]
151pub enum ParseEvent {
152    /// Document metadata has been parsed
153    Metadata(DocumentMetadata),
154    /// A component has been parsed
155    Component(Box<Component>),
156    /// A dependency relationship has been parsed
157    Dependency(DependencyEdge),
158    /// Parsing is complete
159    Complete,
160}
161
162/// Streaming parser for large SBOMs
163#[derive(Debug)]
164pub struct StreamingParser {
165    config: StreamingConfig,
166}
167
168impl StreamingParser {
169    /// Create a new streaming parser with the given configuration
170    pub fn new(config: StreamingConfig) -> Self {
171        Self { config }
172    }
173
174    /// Create a streaming parser with default configuration
175    pub fn default_config() -> Self {
176        Self::new(StreamingConfig::default())
177    }
178
179    /// Parse a file and return an iterator of events
180    pub fn parse_file(&self, path: &Path) -> Result<StreamingIterator, ParseError> {
181        let file = std::fs::File::open(path)
182            .map_err(|e| ParseError::IoError(format!("Failed to open file: {}", e)))?;
183
184        let total_bytes = file.metadata().map(|m| m.len()).ok();
185        let reader = BufReader::with_capacity(self.config.chunk_size, file);
186
187        self.parse_reader(reader, total_bytes)
188    }
189
190    /// Parse from a reader and return an iterator of events
191    pub fn parse_reader<R: Read + Send + 'static>(
192        &self,
193        reader: BufReader<R>,
194        total_bytes: Option<u64>,
195    ) -> Result<StreamingIterator, ParseError> {
196        Ok(StreamingIterator::new(
197            reader,
198            total_bytes,
199            self.config.clone(),
200        ))
201    }
202
203    /// Parse from string content
204    pub fn parse_str(&self, content: &str) -> Result<StreamingIterator, ParseError> {
205        let cursor = std::io::Cursor::new(content.to_string());
206        let total_bytes = Some(content.len() as u64);
207        let reader = BufReader::new(cursor);
208        self.parse_reader(reader, total_bytes)
209    }
210
211    /// Collect all events into a NormalizedSbom (for convenience)
212    ///
213    /// Note: This loads the entire SBOM into memory, negating the
214    /// streaming benefits. Use the iterator directly for large files.
215    pub fn parse_to_sbom(&self, path: &Path) -> Result<NormalizedSbom, ParseError> {
216        let mut stream = self.parse_file(path)?;
217        stream.collect_sbom()
218    }
219}
220
221impl Default for StreamingParser {
222    fn default() -> Self {
223        Self::default_config()
224    }
225}
226
227/// Iterator over streaming parse events
228#[allow(dead_code)]
229pub struct StreamingIterator {
230    /// Internal state
231    state: StreamingState,
232    /// Configuration
233    config: StreamingConfig,
234    /// Progress tracking
235    progress: ParseProgress,
236    /// Pending events
237    pending: VecDeque<ParseEvent>,
238    /// Whether parsing is complete
239    complete: bool,
240}
241
242enum StreamingState {
243    /// Initial state - need to detect format and parse
244    Initial(Box<dyn BufRead + Send>),
245    /// Parsing complete, emitting events from parsed SBOM
246    Emitting {
247        sbom: Box<NormalizedSbom>,
248        component_index: usize,
249        dependency_index: usize,
250        metadata_emitted: bool,
251    },
252    /// Finished
253    Done,
254}
255
256impl StreamingIterator {
257    fn new<R: Read + Send + 'static>(
258        reader: BufReader<R>,
259        total_bytes: Option<u64>,
260        config: StreamingConfig,
261    ) -> Self {
262        Self {
263            state: StreamingState::Initial(Box::new(reader)),
264            config,
265            progress: ParseProgress {
266                bytes_read: 0,
267                total_bytes,
268                components_parsed: 0,
269                dependencies_parsed: 0,
270            },
271            pending: VecDeque::new(),
272            complete: false,
273        }
274    }
275
276    /// Collect all events into a NormalizedSbom
277    pub fn collect_sbom(&mut self) -> Result<NormalizedSbom, ParseError> {
278        let mut metadata: Option<DocumentMetadata> = None;
279        let mut components = Vec::new();
280        let mut edges = Vec::new();
281
282        for event in self.by_ref() {
283            match event {
284                Ok(ParseEvent::Metadata(doc)) => metadata = Some(doc),
285                Ok(ParseEvent::Component(comp)) => components.push(*comp),
286                Ok(ParseEvent::Dependency(edge)) => edges.push(edge),
287                Ok(ParseEvent::Complete) => break,
288                Err(e) => return Err(e),
289            }
290        }
291
292        let document = metadata.unwrap_or_default();
293        let mut sbom = NormalizedSbom::new(document);
294
295        for comp in components {
296            sbom.add_component(comp);
297        }
298        for edge in edges {
299            sbom.add_edge(edge);
300        }
301
302        sbom.calculate_content_hash();
303        Ok(sbom)
304    }
305
306    fn report_progress(&self) {
307        if let Some(ref callback) = self.config.progress_callback {
308            callback(&self.progress);
309        }
310    }
311
312    fn advance(&mut self) -> Option<Result<ParseEvent, ParseError>> {
313        // Return pending events first
314        if let Some(event) = self.pending.pop_front() {
315            return Some(Ok(event));
316        }
317
318        if self.complete {
319            return None;
320        }
321
322        // Process based on state
323        match std::mem::replace(&mut self.state, StreamingState::Done) {
324            StreamingState::Initial(reader) => {
325                // Use centralized FormatDetector for consistent detection
326                let detector = FormatDetector::new();
327
328                // Parse using the detector which handles format detection and parsing
329                match detector.parse_reader(reader) {
330                    Ok(sbom) => {
331                        self.progress.bytes_read = self.progress.total_bytes.unwrap_or(0);
332                        self.report_progress();
333                        self.state = StreamingState::Emitting {
334                            sbom: Box::new(sbom),
335                            component_index: 0,
336                            dependency_index: 0,
337                            metadata_emitted: false,
338                        };
339                        self.advance()
340                    }
341                    Err(e) => Some(Err(e)),
342                }
343            }
344            StreamingState::Emitting {
345                sbom,
346                component_index,
347                dependency_index,
348                metadata_emitted,
349            } => {
350                // Emit metadata first
351                if !metadata_emitted {
352                    let doc = sbom.document.clone();
353                    self.state = StreamingState::Emitting {
354                        sbom,
355                        component_index,
356                        dependency_index,
357                        metadata_emitted: true,
358                    };
359                    return Some(Ok(ParseEvent::Metadata(doc)));
360                }
361
362                // Collect components into a vec for indexed access
363                let components: Vec<_> = sbom.components.values().cloned().collect();
364                let edges_len = sbom.edges.len();
365
366                // Emit components
367                if component_index < components.len() {
368                    let comp = components[component_index].clone();
369                    self.progress.components_parsed += 1;
370                    if self.progress.components_parsed.is_multiple_of(100) {
371                        self.report_progress();
372                    }
373                    self.state = StreamingState::Emitting {
374                        sbom,
375                        component_index: component_index + 1,
376                        dependency_index,
377                        metadata_emitted,
378                    };
379                    return Some(Ok(ParseEvent::Component(Box::new(comp))));
380                }
381
382                // Emit dependencies
383                if dependency_index < edges_len {
384                    let edge = sbom.edges[dependency_index].clone();
385                    self.progress.dependencies_parsed += 1;
386                    self.state = StreamingState::Emitting {
387                        sbom,
388                        component_index,
389                        dependency_index: dependency_index + 1,
390                        metadata_emitted,
391                    };
392                    return Some(Ok(ParseEvent::Dependency(edge)));
393                }
394
395                // All done
396                self.complete = true;
397                self.report_progress();
398                self.state = StreamingState::Done;
399                Some(Ok(ParseEvent::Complete))
400            }
401            StreamingState::Done => {
402                self.complete = true;
403                None
404            }
405        }
406    }
407}
408
409impl Iterator for StreamingIterator {
410    type Item = Result<ParseEvent, ParseError>;
411
412    fn next(&mut self) -> Option<Self::Item> {
413        self.advance()
414    }
415}
416
417/// Estimate the number of components in an SBOM file without full parsing
418///
419/// This performs a quick scan to estimate component count, useful for
420/// progress reporting and memory allocation.
421pub fn estimate_component_count(path: &Path) -> Result<ComponentEstimate, ParseError> {
422    let file = std::fs::File::open(path)
423        .map_err(|e| ParseError::IoError(format!("Failed to open file: {}", e)))?;
424
425    let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
426
427    let reader = BufReader::new(file);
428    let mut count = 0;
429    let mut bytes_sampled = 0;
430    let sample_limit = 1024 * 1024; // Sample first 1MB
431
432    for line in reader.lines() {
433        let line = line.map_err(|e| ParseError::IoError(e.to_string()))?;
434        bytes_sampled += line.len();
435
436        // Count component markers
437        if line.contains("\"bom-ref\"") || line.contains("\"SPDXID\"") {
438            count += 1;
439        }
440
441        if bytes_sampled > sample_limit {
442            break;
443        }
444    }
445
446    // Extrapolate if we only sampled part of the file
447    let estimated = if bytes_sampled < file_size as usize && bytes_sampled > 0 {
448        (count as f64 * (file_size as f64 / bytes_sampled as f64)) as usize
449    } else {
450        count
451    };
452
453    Ok(ComponentEstimate {
454        estimated_count: estimated,
455        sampled_count: count,
456        file_size,
457        bytes_sampled,
458        is_extrapolated: bytes_sampled < file_size as usize,
459    })
460}
461
462/// Estimate of component count
463#[derive(Debug, Clone)]
464pub struct ComponentEstimate {
465    /// Estimated total component count
466    pub estimated_count: usize,
467    /// Components found in sampled region
468    pub sampled_count: usize,
469    /// Total file size in bytes
470    pub file_size: u64,
471    /// Bytes that were sampled
472    pub bytes_sampled: usize,
473    /// Whether the estimate was extrapolated
474    pub is_extrapolated: bool,
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480
481    #[test]
482    fn test_progress_percent() {
483        let progress = ParseProgress {
484            bytes_read: 50,
485            total_bytes: Some(100),
486            components_parsed: 5,
487            dependencies_parsed: 3,
488        };
489        assert_eq!(progress.percent(), 50.0);
490        assert!(!progress.is_complete());
491
492        let complete = ParseProgress {
493            bytes_read: 100,
494            total_bytes: Some(100),
495            components_parsed: 10,
496            dependencies_parsed: 5,
497        };
498        assert_eq!(complete.percent(), 100.0);
499        assert!(complete.is_complete());
500    }
501
502    #[test]
503    fn test_streaming_config_builder() {
504        let config = StreamingConfig::default()
505            .with_chunk_size(128 * 1024)
506            .with_buffer_size(500)
507            .with_validation(false)
508            .with_skip_malformed(true);
509
510        assert_eq!(config.chunk_size, 128 * 1024);
511        assert_eq!(config.component_buffer_size, 500);
512        assert!(!config.validate_during_parse);
513        assert!(config.skip_malformed);
514    }
515
516    #[test]
517    fn test_streaming_parser_creation() {
518        let parser = StreamingParser::default_config();
519        assert_eq!(parser.config.chunk_size, 64 * 1024);
520    }
521}