1use super::detection::FormatDetector;
34use super::traits::ParseError;
35use crate::model::{Component, DependencyEdge, DocumentMetadata, NormalizedSbom};
36use std::collections::VecDeque;
37use std::io::{BufRead, BufReader, Read};
38use std::path::Path;
39use std::sync::Arc;
40
41#[derive(Debug, Clone)]
43pub struct ParseProgress {
44 pub bytes_read: u64,
46 pub total_bytes: Option<u64>,
48 pub components_parsed: usize,
50 pub dependencies_parsed: usize,
52}
53
54impl ParseProgress {
55 pub fn percent(&self) -> f32 {
57 match self.total_bytes {
58 Some(total) if total > 0 => (self.bytes_read as f32 / total as f32) * 100.0,
59 _ => 0.0,
60 }
61 }
62
63 pub fn is_complete(&self) -> bool {
65 match self.total_bytes {
66 Some(total) => self.bytes_read >= total,
67 None => false,
68 }
69 }
70}
71
72pub type ProgressCallback = Arc<dyn Fn(&ParseProgress) + Send + Sync>;
74
75#[derive(Clone)]
77pub struct StreamingConfig {
78 pub chunk_size: usize,
80 pub component_buffer_size: usize,
82 progress_callback: Option<ProgressCallback>,
84 pub validate_during_parse: bool,
86 pub skip_malformed: bool,
88}
89
90impl Default for StreamingConfig {
91 fn default() -> Self {
92 Self {
93 chunk_size: 64 * 1024, component_buffer_size: 1000,
95 progress_callback: None,
96 validate_during_parse: true,
97 skip_malformed: false,
98 }
99 }
100}
101
102impl StreamingConfig {
103 pub fn with_chunk_size(mut self, size: usize) -> Self {
105 self.chunk_size = size.max(1024); self
107 }
108
109 pub fn with_buffer_size(mut self, size: usize) -> Self {
111 self.component_buffer_size = size.max(10);
112 self
113 }
114
115 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
117 where
118 F: Fn(&ParseProgress) + Send + Sync + 'static,
119 {
120 self.progress_callback = Some(Arc::new(callback));
121 self
122 }
123
124 pub fn with_validation(mut self, validate: bool) -> Self {
126 self.validate_during_parse = validate;
127 self
128 }
129
130 pub fn with_skip_malformed(mut self, skip: bool) -> Self {
132 self.skip_malformed = skip;
133 self
134 }
135}
136
137impl std::fmt::Debug for StreamingConfig {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 f.debug_struct("StreamingConfig")
140 .field("chunk_size", &self.chunk_size)
141 .field("component_buffer_size", &self.component_buffer_size)
142 .field("has_progress_callback", &self.progress_callback.is_some())
143 .field("validate_during_parse", &self.validate_during_parse)
144 .field("skip_malformed", &self.skip_malformed)
145 .finish()
146 }
147}
148
149#[derive(Debug, Clone)]
151pub enum ParseEvent {
152 Metadata(DocumentMetadata),
154 Component(Box<Component>),
156 Dependency(DependencyEdge),
158 Complete,
160}
161
162#[derive(Debug)]
164pub struct StreamingParser {
165 config: StreamingConfig,
166}
167
168impl StreamingParser {
169 pub fn new(config: StreamingConfig) -> Self {
171 Self { config }
172 }
173
174 pub fn default_config() -> Self {
176 Self::new(StreamingConfig::default())
177 }
178
179 pub fn parse_file(&self, path: &Path) -> Result<StreamingIterator, ParseError> {
181 let file = std::fs::File::open(path)
182 .map_err(|e| ParseError::IoError(format!("Failed to open file: {}", e)))?;
183
184 let total_bytes = file.metadata().map(|m| m.len()).ok();
185 let reader = BufReader::with_capacity(self.config.chunk_size, file);
186
187 self.parse_reader(reader, total_bytes)
188 }
189
190 pub fn parse_reader<R: Read + Send + 'static>(
192 &self,
193 reader: BufReader<R>,
194 total_bytes: Option<u64>,
195 ) -> Result<StreamingIterator, ParseError> {
196 Ok(StreamingIterator::new(
197 reader,
198 total_bytes,
199 self.config.clone(),
200 ))
201 }
202
203 pub fn parse_str(&self, content: &str) -> Result<StreamingIterator, ParseError> {
205 let cursor = std::io::Cursor::new(content.to_string());
206 let total_bytes = Some(content.len() as u64);
207 let reader = BufReader::new(cursor);
208 self.parse_reader(reader, total_bytes)
209 }
210
211 pub fn parse_to_sbom(&self, path: &Path) -> Result<NormalizedSbom, ParseError> {
216 let mut stream = self.parse_file(path)?;
217 stream.collect_sbom()
218 }
219}
220
221impl Default for StreamingParser {
222 fn default() -> Self {
223 Self::default_config()
224 }
225}
226
227#[allow(dead_code)]
229pub struct StreamingIterator {
230 state: StreamingState,
232 config: StreamingConfig,
234 progress: ParseProgress,
236 pending: VecDeque<ParseEvent>,
238 complete: bool,
240}
241
242enum StreamingState {
243 Initial(Box<dyn BufRead + Send>),
245 Emitting {
247 sbom: Box<NormalizedSbom>,
248 component_index: usize,
249 dependency_index: usize,
250 metadata_emitted: bool,
251 },
252 Done,
254}
255
256impl StreamingIterator {
257 fn new<R: Read + Send + 'static>(
258 reader: BufReader<R>,
259 total_bytes: Option<u64>,
260 config: StreamingConfig,
261 ) -> Self {
262 Self {
263 state: StreamingState::Initial(Box::new(reader)),
264 config,
265 progress: ParseProgress {
266 bytes_read: 0,
267 total_bytes,
268 components_parsed: 0,
269 dependencies_parsed: 0,
270 },
271 pending: VecDeque::new(),
272 complete: false,
273 }
274 }
275
276 pub fn collect_sbom(&mut self) -> Result<NormalizedSbom, ParseError> {
278 let mut metadata: Option<DocumentMetadata> = None;
279 let mut components = Vec::new();
280 let mut edges = Vec::new();
281
282 for event in self.by_ref() {
283 match event {
284 Ok(ParseEvent::Metadata(doc)) => metadata = Some(doc),
285 Ok(ParseEvent::Component(comp)) => components.push(*comp),
286 Ok(ParseEvent::Dependency(edge)) => edges.push(edge),
287 Ok(ParseEvent::Complete) => break,
288 Err(e) => return Err(e),
289 }
290 }
291
292 let document = metadata.unwrap_or_default();
293 let mut sbom = NormalizedSbom::new(document);
294
295 for comp in components {
296 sbom.add_component(comp);
297 }
298 for edge in edges {
299 sbom.add_edge(edge);
300 }
301
302 sbom.calculate_content_hash();
303 Ok(sbom)
304 }
305
306 fn report_progress(&self) {
307 if let Some(ref callback) = self.config.progress_callback {
308 callback(&self.progress);
309 }
310 }
311
312 fn advance(&mut self) -> Option<Result<ParseEvent, ParseError>> {
313 if let Some(event) = self.pending.pop_front() {
315 return Some(Ok(event));
316 }
317
318 if self.complete {
319 return None;
320 }
321
322 match std::mem::replace(&mut self.state, StreamingState::Done) {
324 StreamingState::Initial(reader) => {
325 let detector = FormatDetector::new();
327
328 match detector.parse_reader(reader) {
330 Ok(sbom) => {
331 self.progress.bytes_read = self.progress.total_bytes.unwrap_or(0);
332 self.report_progress();
333 self.state = StreamingState::Emitting {
334 sbom: Box::new(sbom),
335 component_index: 0,
336 dependency_index: 0,
337 metadata_emitted: false,
338 };
339 self.advance()
340 }
341 Err(e) => Some(Err(e)),
342 }
343 }
344 StreamingState::Emitting {
345 sbom,
346 component_index,
347 dependency_index,
348 metadata_emitted,
349 } => {
350 if !metadata_emitted {
352 let doc = sbom.document.clone();
353 self.state = StreamingState::Emitting {
354 sbom,
355 component_index,
356 dependency_index,
357 metadata_emitted: true,
358 };
359 return Some(Ok(ParseEvent::Metadata(doc)));
360 }
361
362 let components: Vec<_> = sbom.components.values().cloned().collect();
364 let edges_len = sbom.edges.len();
365
366 if component_index < components.len() {
368 let comp = components[component_index].clone();
369 self.progress.components_parsed += 1;
370 if self.progress.components_parsed.is_multiple_of(100) {
371 self.report_progress();
372 }
373 self.state = StreamingState::Emitting {
374 sbom,
375 component_index: component_index + 1,
376 dependency_index,
377 metadata_emitted,
378 };
379 return Some(Ok(ParseEvent::Component(Box::new(comp))));
380 }
381
382 if dependency_index < edges_len {
384 let edge = sbom.edges[dependency_index].clone();
385 self.progress.dependencies_parsed += 1;
386 self.state = StreamingState::Emitting {
387 sbom,
388 component_index,
389 dependency_index: dependency_index + 1,
390 metadata_emitted,
391 };
392 return Some(Ok(ParseEvent::Dependency(edge)));
393 }
394
395 self.complete = true;
397 self.report_progress();
398 self.state = StreamingState::Done;
399 Some(Ok(ParseEvent::Complete))
400 }
401 StreamingState::Done => {
402 self.complete = true;
403 None
404 }
405 }
406 }
407}
408
409impl Iterator for StreamingIterator {
410 type Item = Result<ParseEvent, ParseError>;
411
412 fn next(&mut self) -> Option<Self::Item> {
413 self.advance()
414 }
415}
416
417pub fn estimate_component_count(path: &Path) -> Result<ComponentEstimate, ParseError> {
422 let file = std::fs::File::open(path)
423 .map_err(|e| ParseError::IoError(format!("Failed to open file: {}", e)))?;
424
425 let file_size = file.metadata().map(|m| m.len()).unwrap_or(0);
426
427 let reader = BufReader::new(file);
428 let mut count = 0;
429 let mut bytes_sampled = 0;
430 let sample_limit = 1024 * 1024; for line in reader.lines() {
433 let line = line.map_err(|e| ParseError::IoError(e.to_string()))?;
434 bytes_sampled += line.len();
435
436 if line.contains("\"bom-ref\"") || line.contains("\"SPDXID\"") {
438 count += 1;
439 }
440
441 if bytes_sampled > sample_limit {
442 break;
443 }
444 }
445
446 let estimated = if bytes_sampled < file_size as usize && bytes_sampled > 0 {
448 (count as f64 * (file_size as f64 / bytes_sampled as f64)) as usize
449 } else {
450 count
451 };
452
453 Ok(ComponentEstimate {
454 estimated_count: estimated,
455 sampled_count: count,
456 file_size,
457 bytes_sampled,
458 is_extrapolated: bytes_sampled < file_size as usize,
459 })
460}
461
462#[derive(Debug, Clone)]
464pub struct ComponentEstimate {
465 pub estimated_count: usize,
467 pub sampled_count: usize,
469 pub file_size: u64,
471 pub bytes_sampled: usize,
473 pub is_extrapolated: bool,
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480
481 #[test]
482 fn test_progress_percent() {
483 let progress = ParseProgress {
484 bytes_read: 50,
485 total_bytes: Some(100),
486 components_parsed: 5,
487 dependencies_parsed: 3,
488 };
489 assert_eq!(progress.percent(), 50.0);
490 assert!(!progress.is_complete());
491
492 let complete = ParseProgress {
493 bytes_read: 100,
494 total_bytes: Some(100),
495 components_parsed: 10,
496 dependencies_parsed: 5,
497 };
498 assert_eq!(complete.percent(), 100.0);
499 assert!(complete.is_complete());
500 }
501
502 #[test]
503 fn test_streaming_config_builder() {
504 let config = StreamingConfig::default()
505 .with_chunk_size(128 * 1024)
506 .with_buffer_size(500)
507 .with_validation(false)
508 .with_skip_malformed(true);
509
510 assert_eq!(config.chunk_size, 128 * 1024);
511 assert_eq!(config.component_buffer_size, 500);
512 assert!(!config.validate_during_parse);
513 assert!(config.skip_malformed);
514 }
515
516 #[test]
517 fn test_streaming_parser_creation() {
518 let parser = StreamingParser::default_config();
519 assert_eq!(parser.config.chunk_size, 64 * 1024);
520 }
521}