1use super::detection::FormatDetector;
38use super::traits::ParseError;
39use crate::model::{Component, DependencyEdge, DocumentMetadata, NormalizedSbom};
40use std::collections::VecDeque;
41use std::io::{BufRead, BufReader, Read};
42use std::path::Path;
43use std::sync::Arc;
44
45#[derive(Debug, Clone)]
47pub struct ParseProgress {
48 pub bytes_read: u64,
50 pub total_bytes: Option<u64>,
52 pub components_parsed: usize,
54 pub dependencies_parsed: usize,
56}
57
58impl ParseProgress {
59 #[must_use]
61 pub fn percent(&self) -> f32 {
62 match self.total_bytes {
63 Some(total) if total > 0 => (self.bytes_read as f32 / total as f32) * 100.0,
64 _ => 0.0,
65 }
66 }
67
68 #[must_use]
70 pub fn is_complete(&self) -> bool {
71 self.total_bytes
72 .is_some_and(|total| self.bytes_read >= total)
73 }
74}
75
76pub type ProgressCallback = Arc<dyn Fn(&ParseProgress) + Send + Sync>;
78
79#[derive(Clone)]
81pub struct StreamingConfig {
82 pub chunk_size: usize,
84 pub component_buffer_size: usize,
86 progress_callback: Option<ProgressCallback>,
88 pub validate_during_parse: bool,
90 pub skip_malformed: bool,
92}
93
94impl Default for StreamingConfig {
95 fn default() -> Self {
96 Self {
97 chunk_size: 64 * 1024, component_buffer_size: 1000,
99 progress_callback: None,
100 validate_during_parse: true,
101 skip_malformed: false,
102 }
103 }
104}
105
106impl StreamingConfig {
107 #[must_use]
109 pub fn with_chunk_size(mut self, size: usize) -> Self {
110 self.chunk_size = size.max(1024); self
112 }
113
114 #[must_use]
116 pub fn with_buffer_size(mut self, size: usize) -> Self {
117 self.component_buffer_size = size.max(10);
118 self
119 }
120
121 #[must_use]
123 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
124 where
125 F: Fn(&ParseProgress) + Send + Sync + 'static,
126 {
127 self.progress_callback = Some(Arc::new(callback));
128 self
129 }
130
131 #[must_use]
133 pub const fn with_validation(mut self, validate: bool) -> Self {
134 self.validate_during_parse = validate;
135 self
136 }
137
138 #[must_use]
140 pub const fn with_skip_malformed(mut self, skip: bool) -> Self {
141 self.skip_malformed = skip;
142 self
143 }
144}
145
146impl std::fmt::Debug for StreamingConfig {
147 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 f.debug_struct("StreamingConfig")
149 .field("chunk_size", &self.chunk_size)
150 .field("component_buffer_size", &self.component_buffer_size)
151 .field("has_progress_callback", &self.progress_callback.is_some())
152 .field("validate_during_parse", &self.validate_during_parse)
153 .field("skip_malformed", &self.skip_malformed)
154 .finish()
155 }
156}
157
158#[derive(Debug, Clone)]
160pub enum ParseEvent {
161 Metadata(DocumentMetadata),
163 Component(Box<Component>),
165 Dependency(DependencyEdge),
167 Complete,
169}
170
171#[derive(Debug)]
173pub struct StreamingParser {
174 config: StreamingConfig,
175}
176
177impl StreamingParser {
178 #[must_use]
180 pub const fn new(config: StreamingConfig) -> Self {
181 Self { config }
182 }
183
184 #[must_use]
186 pub fn default_config() -> Self {
187 Self::new(StreamingConfig::default())
188 }
189
190 pub fn parse_file(&self, path: &Path) -> Result<StreamingIterator, ParseError> {
192 let file = std::fs::File::open(path)
193 .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
194
195 let total_bytes = file.metadata().map(|m| m.len()).ok();
196 let reader = BufReader::with_capacity(self.config.chunk_size, file);
197
198 self.parse_reader(reader, total_bytes)
199 }
200
201 pub fn parse_reader<R: Read + Send + 'static>(
203 &self,
204 reader: BufReader<R>,
205 total_bytes: Option<u64>,
206 ) -> Result<StreamingIterator, ParseError> {
207 Ok(StreamingIterator::new(
208 reader,
209 total_bytes,
210 self.config.clone(),
211 ))
212 }
213
214 pub fn parse_str(&self, content: &str) -> Result<StreamingIterator, ParseError> {
216 let cursor = std::io::Cursor::new(content.to_string());
217 let total_bytes = Some(content.len() as u64);
218 let reader = BufReader::new(cursor);
219 self.parse_reader(reader, total_bytes)
220 }
221
222 pub fn parse_to_sbom(&self, path: &Path) -> Result<NormalizedSbom, ParseError> {
227 let mut stream = self.parse_file(path)?;
228 stream.collect_sbom()
229 }
230}
231
232impl Default for StreamingParser {
233 fn default() -> Self {
234 Self::default_config()
235 }
236}
237
238#[allow(dead_code)]
240pub struct StreamingIterator {
241 state: StreamingState,
243 config: StreamingConfig,
245 progress: ParseProgress,
247 pending: VecDeque<ParseEvent>,
249 complete: bool,
251}
252
253enum StreamingState {
254 Initial(Box<dyn BufRead + Send>),
256 Emitting {
258 sbom: Box<NormalizedSbom>,
259 component_index: usize,
260 dependency_index: usize,
261 metadata_emitted: bool,
262 },
263 Done,
265}
266
267impl StreamingIterator {
268 fn new<R: Read + Send + 'static>(
269 reader: BufReader<R>,
270 total_bytes: Option<u64>,
271 config: StreamingConfig,
272 ) -> Self {
273 Self {
274 state: StreamingState::Initial(Box::new(reader)),
275 config,
276 progress: ParseProgress {
277 bytes_read: 0,
278 total_bytes,
279 components_parsed: 0,
280 dependencies_parsed: 0,
281 },
282 pending: VecDeque::new(),
283 complete: false,
284 }
285 }
286
287 pub fn collect_sbom(&mut self) -> Result<NormalizedSbom, ParseError> {
289 let mut metadata: Option<DocumentMetadata> = None;
290 let mut components = Vec::new();
291 let mut edges = Vec::new();
292
293 for event in self.by_ref() {
294 match event {
295 Ok(ParseEvent::Metadata(doc)) => metadata = Some(doc),
296 Ok(ParseEvent::Component(comp)) => components.push(*comp),
297 Ok(ParseEvent::Dependency(edge)) => edges.push(edge),
298 Ok(ParseEvent::Complete) => break,
299 Err(e) => return Err(e),
300 }
301 }
302
303 let document = metadata.unwrap_or_default();
304 let mut sbom = NormalizedSbom::new(document);
305
306 for comp in components {
307 sbom.add_component(comp);
308 }
309 for edge in edges {
310 sbom.add_edge(edge);
311 }
312
313 sbom.calculate_content_hash();
314 Ok(sbom)
315 }
316
317 fn report_progress(&self) {
318 if let Some(ref callback) = self.config.progress_callback {
319 callback(&self.progress);
320 }
321 }
322
323 fn advance(&mut self) -> Option<Result<ParseEvent, ParseError>> {
324 if let Some(event) = self.pending.pop_front() {
326 return Some(Ok(event));
327 }
328
329 if self.complete {
330 return None;
331 }
332
333 match std::mem::replace(&mut self.state, StreamingState::Done) {
335 StreamingState::Initial(reader) => {
336 let detector = FormatDetector::new();
338
339 match detector.parse_reader(reader) {
341 Ok(sbom) => {
342 self.progress.bytes_read = self.progress.total_bytes.unwrap_or(0);
343 self.report_progress();
344 self.state = StreamingState::Emitting {
345 sbom: Box::new(sbom),
346 component_index: 0,
347 dependency_index: 0,
348 metadata_emitted: false,
349 };
350 self.advance()
351 }
352 Err(e) => Some(Err(e)),
353 }
354 }
355 StreamingState::Emitting {
356 sbom,
357 component_index,
358 dependency_index,
359 metadata_emitted,
360 } => {
361 if !metadata_emitted {
363 let doc = sbom.document.clone();
364 self.state = StreamingState::Emitting {
365 sbom,
366 component_index,
367 dependency_index,
368 metadata_emitted: true,
369 };
370 return Some(Ok(ParseEvent::Metadata(doc)));
371 }
372
373 let components: Vec<_> = sbom.components.values().cloned().collect();
375 let edges_len = sbom.edges.len();
376
377 if component_index < components.len() {
379 let comp = components[component_index].clone();
380 self.progress.components_parsed += 1;
381 if self.progress.components_parsed.is_multiple_of(100) {
382 self.report_progress();
383 }
384 self.state = StreamingState::Emitting {
385 sbom,
386 component_index: component_index + 1,
387 dependency_index,
388 metadata_emitted,
389 };
390 return Some(Ok(ParseEvent::Component(Box::new(comp))));
391 }
392
393 if dependency_index < edges_len {
395 let edge = sbom.edges[dependency_index].clone();
396 self.progress.dependencies_parsed += 1;
397 self.state = StreamingState::Emitting {
398 sbom,
399 component_index,
400 dependency_index: dependency_index + 1,
401 metadata_emitted,
402 };
403 return Some(Ok(ParseEvent::Dependency(edge)));
404 }
405
406 self.complete = true;
408 self.report_progress();
409 self.state = StreamingState::Done;
410 Some(Ok(ParseEvent::Complete))
411 }
412 StreamingState::Done => {
413 self.complete = true;
414 None
415 }
416 }
417 }
418}
419
420impl Iterator for StreamingIterator {
421 type Item = Result<ParseEvent, ParseError>;
422
423 fn next(&mut self) -> Option<Self::Item> {
424 self.advance()
425 }
426}
427
428pub fn estimate_component_count(path: &Path) -> Result<ComponentEstimate, ParseError> {
433 let file = std::fs::File::open(path)
434 .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
435
436 let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
437
438 let reader = BufReader::new(file);
439 let mut count = 0;
440 let mut bytes_sampled = 0;
441 let sample_limit = 1024 * 1024; for line in reader.lines() {
444 let line = line.map_err(|e| ParseError::IoError(e.to_string()))?;
445 bytes_sampled += line.len();
446
447 if line.contains("\"bom-ref\"") || line.contains("\"SPDXID\"") {
449 count += 1;
450 }
451
452 if bytes_sampled > sample_limit {
453 break;
454 }
455 }
456
457 let estimated = if bytes_sampled < file_size as usize && bytes_sampled > 0 {
459 (count as f64 * (file_size as f64 / bytes_sampled as f64)) as usize
460 } else {
461 count
462 };
463
464 Ok(ComponentEstimate {
465 estimated_count: estimated,
466 sampled_count: count,
467 file_size,
468 bytes_sampled,
469 is_extrapolated: bytes_sampled < file_size as usize,
470 })
471}
472
473#[derive(Debug, Clone)]
475pub struct ComponentEstimate {
476 pub estimated_count: usize,
478 pub sampled_count: usize,
480 pub file_size: u64,
482 pub bytes_sampled: usize,
484 pub is_extrapolated: bool,
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_progress_percent() {
494 let progress = ParseProgress {
495 bytes_read: 50,
496 total_bytes: Some(100),
497 components_parsed: 5,
498 dependencies_parsed: 3,
499 };
500 assert_eq!(progress.percent(), 50.0);
501 assert!(!progress.is_complete());
502
503 let complete = ParseProgress {
504 bytes_read: 100,
505 total_bytes: Some(100),
506 components_parsed: 10,
507 dependencies_parsed: 5,
508 };
509 assert_eq!(complete.percent(), 100.0);
510 assert!(complete.is_complete());
511 }
512
513 #[test]
514 fn test_streaming_config_builder() {
515 let config = StreamingConfig::default()
516 .with_chunk_size(128 * 1024)
517 .with_buffer_size(500)
518 .with_validation(false)
519 .with_skip_malformed(true);
520
521 assert_eq!(config.chunk_size, 128 * 1024);
522 assert_eq!(config.component_buffer_size, 500);
523 assert!(!config.validate_during_parse);
524 assert!(config.skip_malformed);
525 }
526
527 #[test]
528 fn test_streaming_parser_creation() {
529 let parser = StreamingParser::default_config();
530 assert_eq!(parser.config.chunk_size, 64 * 1024);
531 }
532}