1use super::detection::FormatDetector;
34use super::traits::ParseError;
35use crate::model::{Component, DependencyEdge, DocumentMetadata, NormalizedSbom};
36use std::collections::VecDeque;
37use std::io::{BufRead, BufReader, Read};
38use std::path::Path;
39use std::sync::Arc;
40
41#[derive(Debug, Clone)]
43pub struct ParseProgress {
44 pub bytes_read: u64,
46 pub total_bytes: Option<u64>,
48 pub components_parsed: usize,
50 pub dependencies_parsed: usize,
52}
53
54impl ParseProgress {
55 #[must_use]
57 pub fn percent(&self) -> f32 {
58 match self.total_bytes {
59 Some(total) if total > 0 => (self.bytes_read as f32 / total as f32) * 100.0,
60 _ => 0.0,
61 }
62 }
63
64 #[must_use]
66 pub fn is_complete(&self) -> bool {
67 self.total_bytes
68 .is_some_and(|total| self.bytes_read >= total)
69 }
70}
71
72pub type ProgressCallback = Arc<dyn Fn(&ParseProgress) + Send + Sync>;
74
75#[derive(Clone)]
77pub struct StreamingConfig {
78 pub chunk_size: usize,
80 pub component_buffer_size: usize,
82 progress_callback: Option<ProgressCallback>,
84 pub validate_during_parse: bool,
86 pub skip_malformed: bool,
88}
89
90impl Default for StreamingConfig {
91 fn default() -> Self {
92 Self {
93 chunk_size: 64 * 1024, component_buffer_size: 1000,
95 progress_callback: None,
96 validate_during_parse: true,
97 skip_malformed: false,
98 }
99 }
100}
101
102impl StreamingConfig {
103 #[must_use]
105 pub fn with_chunk_size(mut self, size: usize) -> Self {
106 self.chunk_size = size.max(1024); self
108 }
109
110 #[must_use]
112 pub fn with_buffer_size(mut self, size: usize) -> Self {
113 self.component_buffer_size = size.max(10);
114 self
115 }
116
117 #[must_use]
119 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
120 where
121 F: Fn(&ParseProgress) + Send + Sync + 'static,
122 {
123 self.progress_callback = Some(Arc::new(callback));
124 self
125 }
126
127 #[must_use]
129 pub const fn with_validation(mut self, validate: bool) -> Self {
130 self.validate_during_parse = validate;
131 self
132 }
133
134 #[must_use]
136 pub const fn with_skip_malformed(mut self, skip: bool) -> Self {
137 self.skip_malformed = skip;
138 self
139 }
140}
141
142impl std::fmt::Debug for StreamingConfig {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 f.debug_struct("StreamingConfig")
145 .field("chunk_size", &self.chunk_size)
146 .field("component_buffer_size", &self.component_buffer_size)
147 .field("has_progress_callback", &self.progress_callback.is_some())
148 .field("validate_during_parse", &self.validate_during_parse)
149 .field("skip_malformed", &self.skip_malformed)
150 .finish()
151 }
152}
153
154#[derive(Debug, Clone)]
156pub enum ParseEvent {
157 Metadata(DocumentMetadata),
159 Component(Box<Component>),
161 Dependency(DependencyEdge),
163 Complete,
165}
166
167#[derive(Debug)]
169pub struct StreamingParser {
170 config: StreamingConfig,
171}
172
173impl StreamingParser {
174 #[must_use]
176 pub const fn new(config: StreamingConfig) -> Self {
177 Self { config }
178 }
179
180 #[must_use]
182 pub fn default_config() -> Self {
183 Self::new(StreamingConfig::default())
184 }
185
186 pub fn parse_file(&self, path: &Path) -> Result<StreamingIterator, ParseError> {
188 let file = std::fs::File::open(path)
189 .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
190
191 let total_bytes = file.metadata().map(|m| m.len()).ok();
192 let reader = BufReader::with_capacity(self.config.chunk_size, file);
193
194 self.parse_reader(reader, total_bytes)
195 }
196
197 pub fn parse_reader<R: Read + Send + 'static>(
199 &self,
200 reader: BufReader<R>,
201 total_bytes: Option<u64>,
202 ) -> Result<StreamingIterator, ParseError> {
203 Ok(StreamingIterator::new(
204 reader,
205 total_bytes,
206 self.config.clone(),
207 ))
208 }
209
210 pub fn parse_str(&self, content: &str) -> Result<StreamingIterator, ParseError> {
212 let cursor = std::io::Cursor::new(content.to_string());
213 let total_bytes = Some(content.len() as u64);
214 let reader = BufReader::new(cursor);
215 self.parse_reader(reader, total_bytes)
216 }
217
218 pub fn parse_to_sbom(&self, path: &Path) -> Result<NormalizedSbom, ParseError> {
223 let mut stream = self.parse_file(path)?;
224 stream.collect_sbom()
225 }
226}
227
228impl Default for StreamingParser {
229 fn default() -> Self {
230 Self::default_config()
231 }
232}
233
234#[allow(dead_code)]
236pub struct StreamingIterator {
237 state: StreamingState,
239 config: StreamingConfig,
241 progress: ParseProgress,
243 pending: VecDeque<ParseEvent>,
245 complete: bool,
247}
248
249enum StreamingState {
250 Initial(Box<dyn BufRead + Send>),
252 Emitting {
254 sbom: Box<NormalizedSbom>,
255 component_index: usize,
256 dependency_index: usize,
257 metadata_emitted: bool,
258 },
259 Done,
261}
262
263impl StreamingIterator {
264 fn new<R: Read + Send + 'static>(
265 reader: BufReader<R>,
266 total_bytes: Option<u64>,
267 config: StreamingConfig,
268 ) -> Self {
269 Self {
270 state: StreamingState::Initial(Box::new(reader)),
271 config,
272 progress: ParseProgress {
273 bytes_read: 0,
274 total_bytes,
275 components_parsed: 0,
276 dependencies_parsed: 0,
277 },
278 pending: VecDeque::new(),
279 complete: false,
280 }
281 }
282
283 pub fn collect_sbom(&mut self) -> Result<NormalizedSbom, ParseError> {
285 let mut metadata: Option<DocumentMetadata> = None;
286 let mut components = Vec::new();
287 let mut edges = Vec::new();
288
289 for event in self.by_ref() {
290 match event {
291 Ok(ParseEvent::Metadata(doc)) => metadata = Some(doc),
292 Ok(ParseEvent::Component(comp)) => components.push(*comp),
293 Ok(ParseEvent::Dependency(edge)) => edges.push(edge),
294 Ok(ParseEvent::Complete) => break,
295 Err(e) => return Err(e),
296 }
297 }
298
299 let document = metadata.unwrap_or_default();
300 let mut sbom = NormalizedSbom::new(document);
301
302 for comp in components {
303 sbom.add_component(comp);
304 }
305 for edge in edges {
306 sbom.add_edge(edge);
307 }
308
309 sbom.calculate_content_hash();
310 Ok(sbom)
311 }
312
313 fn report_progress(&self) {
314 if let Some(ref callback) = self.config.progress_callback {
315 callback(&self.progress);
316 }
317 }
318
319 fn advance(&mut self) -> Option<Result<ParseEvent, ParseError>> {
320 if let Some(event) = self.pending.pop_front() {
322 return Some(Ok(event));
323 }
324
325 if self.complete {
326 return None;
327 }
328
329 match std::mem::replace(&mut self.state, StreamingState::Done) {
331 StreamingState::Initial(reader) => {
332 let detector = FormatDetector::new();
334
335 match detector.parse_reader(reader) {
337 Ok(sbom) => {
338 self.progress.bytes_read = self.progress.total_bytes.unwrap_or(0);
339 self.report_progress();
340 self.state = StreamingState::Emitting {
341 sbom: Box::new(sbom),
342 component_index: 0,
343 dependency_index: 0,
344 metadata_emitted: false,
345 };
346 self.advance()
347 }
348 Err(e) => Some(Err(e)),
349 }
350 }
351 StreamingState::Emitting {
352 sbom,
353 component_index,
354 dependency_index,
355 metadata_emitted,
356 } => {
357 if !metadata_emitted {
359 let doc = sbom.document.clone();
360 self.state = StreamingState::Emitting {
361 sbom,
362 component_index,
363 dependency_index,
364 metadata_emitted: true,
365 };
366 return Some(Ok(ParseEvent::Metadata(doc)));
367 }
368
369 let components: Vec<_> = sbom.components.values().cloned().collect();
371 let edges_len = sbom.edges.len();
372
373 if component_index < components.len() {
375 let comp = components[component_index].clone();
376 self.progress.components_parsed += 1;
377 if self.progress.components_parsed.is_multiple_of(100) {
378 self.report_progress();
379 }
380 self.state = StreamingState::Emitting {
381 sbom,
382 component_index: component_index + 1,
383 dependency_index,
384 metadata_emitted,
385 };
386 return Some(Ok(ParseEvent::Component(Box::new(comp))));
387 }
388
389 if dependency_index < edges_len {
391 let edge = sbom.edges[dependency_index].clone();
392 self.progress.dependencies_parsed += 1;
393 self.state = StreamingState::Emitting {
394 sbom,
395 component_index,
396 dependency_index: dependency_index + 1,
397 metadata_emitted,
398 };
399 return Some(Ok(ParseEvent::Dependency(edge)));
400 }
401
402 self.complete = true;
404 self.report_progress();
405 self.state = StreamingState::Done;
406 Some(Ok(ParseEvent::Complete))
407 }
408 StreamingState::Done => {
409 self.complete = true;
410 None
411 }
412 }
413 }
414}
415
416impl Iterator for StreamingIterator {
417 type Item = Result<ParseEvent, ParseError>;
418
419 fn next(&mut self) -> Option<Self::Item> {
420 self.advance()
421 }
422}
423
424pub fn estimate_component_count(path: &Path) -> Result<ComponentEstimate, ParseError> {
429 let file = std::fs::File::open(path)
430 .map_err(|e| ParseError::IoError(format!("Failed to open file: {e}")))?;
431
432 let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
433
434 let reader = BufReader::new(file);
435 let mut count = 0;
436 let mut bytes_sampled = 0;
437 let sample_limit = 1024 * 1024; for line in reader.lines() {
440 let line = line.map_err(|e| ParseError::IoError(e.to_string()))?;
441 bytes_sampled += line.len();
442
443 if line.contains("\"bom-ref\"") || line.contains("\"SPDXID\"") {
445 count += 1;
446 }
447
448 if bytes_sampled > sample_limit {
449 break;
450 }
451 }
452
453 let estimated = if bytes_sampled < file_size as usize && bytes_sampled > 0 {
455 (count as f64 * (file_size as f64 / bytes_sampled as f64)) as usize
456 } else {
457 count
458 };
459
460 Ok(ComponentEstimate {
461 estimated_count: estimated,
462 sampled_count: count,
463 file_size,
464 bytes_sampled,
465 is_extrapolated: bytes_sampled < file_size as usize,
466 })
467}
468
469#[derive(Debug, Clone)]
471pub struct ComponentEstimate {
472 pub estimated_count: usize,
474 pub sampled_count: usize,
476 pub file_size: u64,
478 pub bytes_sampled: usize,
480 pub is_extrapolated: bool,
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 #[test]
489 fn test_progress_percent() {
490 let progress = ParseProgress {
491 bytes_read: 50,
492 total_bytes: Some(100),
493 components_parsed: 5,
494 dependencies_parsed: 3,
495 };
496 assert_eq!(progress.percent(), 50.0);
497 assert!(!progress.is_complete());
498
499 let complete = ParseProgress {
500 bytes_read: 100,
501 total_bytes: Some(100),
502 components_parsed: 10,
503 dependencies_parsed: 5,
504 };
505 assert_eq!(complete.percent(), 100.0);
506 assert!(complete.is_complete());
507 }
508
509 #[test]
510 fn test_streaming_config_builder() {
511 let config = StreamingConfig::default()
512 .with_chunk_size(128 * 1024)
513 .with_buffer_size(500)
514 .with_validation(false)
515 .with_skip_malformed(true);
516
517 assert_eq!(config.chunk_size, 128 * 1024);
518 assert_eq!(config.component_buffer_size, 500);
519 assert!(!config.validate_during_parse);
520 assert!(config.skip_malformed);
521 }
522
523 #[test]
524 fn test_streaming_parser_creation() {
525 let parser = StreamingParser::default_config();
526 assert_eq!(parser.config.chunk_size, 64 * 1024);
527 }
528}