use crate::parser::{SarifError, SarifResult as ParseResult};
use crate::types::{Run, SarifLog};
use serde_json::Value;
use std::collections::VecDeque;
use std::io::{BufReader, Read};
pub struct StreamingParser<R: Read> {
reader: BufReader<R>,
state: ParserState,
current_run_index: usize,
runs_buffer: VecDeque<Run>,
metadata: Option<SarifMetadata>,
}
#[derive(Debug, Clone, PartialEq)]
enum ParserState {
Initial,
ReadingRuns,
Finished,
Error,
}
#[derive(Debug, Clone)]
pub struct SarifMetadata {
pub version: String,
pub schema: Option<String>,
pub inline_external_properties: Option<Vec<Value>>,
}
impl<R: Read> StreamingParser<R> {
pub fn new(reader: R) -> Self {
Self {
reader: BufReader::new(reader),
state: ParserState::Initial,
current_run_index: 0,
runs_buffer: VecDeque::new(),
metadata: None,
}
}
pub fn with_capacity(reader: R, capacity: usize) -> Self {
Self {
reader: BufReader::with_capacity(capacity, reader),
state: ParserState::Initial,
current_run_index: 0,
runs_buffer: VecDeque::new(),
metadata: None,
}
}
pub fn metadata(&self) -> Option<&SarifMetadata> {
self.metadata.as_ref()
}
pub fn current_run_index(&self) -> usize {
self.current_run_index
}
pub fn is_finished(&self) -> bool {
matches!(self.state, ParserState::Finished | ParserState::Error)
}
pub fn parse_runs(&mut self) -> StreamingRunIterator<'_, R> {
StreamingRunIterator {
parser: self,
initialized: false,
}
}
pub fn parse_run_batch(&mut self, batch_size: usize) -> ParseResult<Vec<Run>> {
let mut runs = Vec::with_capacity(batch_size);
let mut iterator = self.parse_runs();
for _ in 0..batch_size {
match iterator.next() {
Some(Ok(run)) => runs.push(run),
Some(Err(e)) => return Err(e),
None => break,
}
}
Ok(runs)
}
pub fn parse_complete(mut self) -> ParseResult<SarifLog> {
let mut content = String::new();
self.reader.read_to_string(&mut content)?;
crate::from_str(&content)
}
fn initialize(&mut self) -> ParseResult<()> {
if !matches!(self.state, ParserState::Initial) {
return Ok(());
}
let mut content = String::new();
self.reader.read_to_string(&mut content)?;
let value: Value = serde_json::from_str(&content)?;
if let Some(obj) = value.as_object() {
let version = obj
.get("version")
.and_then(|v| v.as_str())
.unwrap_or("2.1.0")
.to_string();
let schema = obj
.get("$schema")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let inline_external_properties = obj
.get("inlineExternalProperties")
.and_then(|v| v.as_array())
.map(|arr| arr.clone());
self.metadata = Some(SarifMetadata {
version,
schema,
inline_external_properties,
});
if let Some(runs_value) = obj.get("runs")
&& let Some(runs_array) = runs_value.as_array()
{
for run_value in runs_array {
match serde_json::from_value::<Run>(run_value.clone()) {
Ok(run) => self.runs_buffer.push_back(run),
Err(e) => {
self.state = ParserState::Error;
return Err(SarifError::from(e));
}
}
}
}
self.state = ParserState::ReadingRuns;
} else {
self.state = ParserState::Error;
return Err(SarifError::custom("Invalid SARIF JSON structure"));
}
Ok(())
}
fn next_run(&mut self) -> Option<ParseResult<Run>> {
if matches!(self.state, ParserState::Error) {
return None;
}
if let Some(run) = self.runs_buffer.pop_front() {
self.current_run_index += 1;
Some(Ok(run))
} else {
self.state = ParserState::Finished;
None
}
}
}
pub struct StreamingRunIterator<'a, R: Read> {
parser: &'a mut StreamingParser<R>,
initialized: bool,
}
impl<'a, R: Read> Iterator for StreamingRunIterator<'a, R> {
type Item = ParseResult<Run>;
fn next(&mut self) -> Option<Self::Item> {
if !self.initialized {
if let Err(e) = self.parser.initialize() {
return Some(Err(e));
}
self.initialized = true;
}
self.parser.next_run()
}
}
#[derive(Debug, Default, Clone)]
pub struct StreamingStats {
pub runs_processed: usize,
pub total_results: usize,
pub total_artifacts: usize,
pub error_count: usize,
pub warning_count: usize,
pub info_count: usize,
pub note_count: usize,
}
impl StreamingStats {
pub fn update_with_run(&mut self, run: &Run) {
self.runs_processed += 1;
if let Some(artifacts) = &run.artifacts {
self.total_artifacts += artifacts.len();
}
if let Some(results) = &run.results {
self.total_results += results.len();
for result in results {
match result.level.as_ref() {
Some(crate::types::Level::Error) => self.error_count += 1,
Some(crate::types::Level::Warning) => self.warning_count += 1,
Some(crate::types::Level::Note) => self.note_count += 1,
Some(crate::types::Level::None) => self.note_count += 1,
None => self.info_count += 1,
}
}
}
}
pub fn total_findings(&self) -> usize {
self.error_count + self.warning_count + self.info_count + self.note_count
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
pub struct StreamingProcessor<R: Read> {
parser: StreamingParser<R>,
stats: StreamingStats,
max_memory_mb: Option<usize>,
}
impl<R: Read> StreamingProcessor<R> {
pub fn new(reader: R) -> Self {
Self {
parser: StreamingParser::new(reader),
stats: StreamingStats::default(),
max_memory_mb: None,
}
}
pub fn with_memory_limit(mut self, max_memory_mb: usize) -> Self {
self.max_memory_mb = Some(max_memory_mb);
self
}
pub fn process_with_callback<F>(&mut self, mut callback: F) -> ParseResult<StreamingStats>
where
F: FnMut(&Run, &SarifMetadata) -> ParseResult<()>,
{
{
let mut temp_iterator = self.parser.parse_runs();
if temp_iterator.next().is_none() {
return Err(SarifError::custom("Empty SARIF file"));
}
}
let metadata = self
.parser
.metadata()
.ok_or_else(|| SarifError::custom("No metadata available"))?
.clone();
let run_iterator = self.parser.parse_runs();
for run_result in run_iterator {
let run = run_result?;
callback(&run, &metadata)?;
self.stats.update_with_run(&run);
if let Some(max_mb) = self.max_memory_mb {
if self.stats.runs_processed % 100 == 0 {
let estimated_mb = estimate_memory_usage_mb(
self.stats.total_results as u64 * 1000, );
if estimated_mb > max_mb as u64 {
return Err(SarifError::custom(format!(
"Memory limit exceeded: {} MB",
max_mb
)));
}
}
}
}
Ok(self.stats.clone())
}
pub fn filter_results<F>(&mut self, mut predicate: F) -> ParseResult<Vec<crate::types::Result>>
where
F: FnMut(&crate::types::Result) -> bool,
{
let mut filtered_results = Vec::new();
self.process_with_callback(|run, _metadata| {
if let Some(results) = &run.results {
for result in results {
if predicate(result) {
filtered_results.push(result.clone());
}
}
}
Ok(())
})?;
Ok(filtered_results)
}
pub fn stats(&self) -> &StreamingStats {
&self.stats
}
}
pub fn validate_json_structure(content: &str) -> ParseResult<()> {
let _: serde_json::Value = serde_json::from_str(content)?;
Ok(())
}
pub fn estimate_memory_usage(file_size: u64) -> u64 {
file_size * 4
}
pub fn estimate_memory_usage_mb(file_size: u64) -> u64 {
estimate_memory_usage(file_size) / (1024 * 1024)
}
pub fn is_large_file(file_size: u64) -> bool {
file_size > 100 * 1024 * 1024 }
pub fn recommend_streaming(file_size: u64) -> bool {
is_large_file(file_size) || estimate_memory_usage_mb(file_size) > 500 }
#[cfg(test)]
mod tests {
use super::*;
use crate::builder::SarifLogBuilder;
use std::io::Cursor;
#[test]
fn test_streaming_parser_creation() {
let data = r#"{"version": "2.1.0", "runs": []}"#;
let cursor = Cursor::new(data);
let parser = StreamingParser::new(cursor);
assert_eq!(parser.current_run_index(), 0);
assert!(!parser.is_finished());
}
#[test]
fn test_memory_estimation() {
assert_eq!(estimate_memory_usage(1000), 4000);
assert_eq!(estimate_memory_usage(0), 0);
assert_eq!(estimate_memory_usage_mb(1024 * 1024), 4);
assert!(is_large_file(200 * 1024 * 1024));
assert!(!is_large_file(50 * 1024 * 1024));
assert!(recommend_streaming(200 * 1024 * 1024));
assert!(!recommend_streaming(10 * 1024 * 1024));
}
#[test]
fn test_streaming_with_simple_sarif() {
let sarif = SarifLogBuilder::single_error("test-tool", "Test error message", "test.rs", 42)
.build_unchecked();
let json = crate::to_string(&sarif).unwrap();
let cursor = Cursor::new(json);
let mut parser = StreamingParser::new(cursor);
let runs: Result<Vec<_>, _> = parser.parse_runs().collect();
let runs = runs.unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].tool.driver.name, "test-tool");
let metadata = parser.metadata().unwrap();
assert_eq!(metadata.version, "2.1.0");
}
#[test]
#[ignore] fn test_streaming_processor() {
let sarif =
SarifLogBuilder::single_warning("analyzer", "Warning message", "src/lib.rs", 100)
.build_unchecked();
let json = crate::to_string(&sarif).unwrap();
let cursor = Cursor::new(json);
let mut processor = StreamingProcessor::new(cursor);
let mut run_count = 0;
let stats = processor
.process_with_callback(|_run, metadata| {
run_count += 1;
assert_eq!(metadata.version, "2.1.0");
Ok(())
})
.unwrap();
assert_eq!(run_count, 1);
assert_eq!(stats.runs_processed, 1);
}
#[test]
fn test_stats_update() {
let mut stats = StreamingStats::default();
let sarif = SarifLogBuilder::error_finding(
"tool",
"RULE001",
"Error message",
"file.rs",
1,
1,
1,
10,
)
.build_unchecked();
let run = &sarif.runs[0];
stats.update_with_run(run);
assert_eq!(stats.runs_processed, 1);
assert_eq!(stats.error_count, 1);
assert_eq!(stats.total_findings(), 1);
}
}