use crate::error::Result;
use std::io::{BufRead, Write};
pub struct StreamingRepair {
buffer_size: usize,
}
impl StreamingRepair {
pub fn new() -> Self {
Self {
buffer_size: 8192, }
}
pub fn with_buffer_size(buffer_size: usize) -> Self {
Self { buffer_size }
}
pub fn process<R: BufRead, W: Write>(
&self,
reader: R,
writer: &mut W,
format: &str,
) -> Result<usize> {
let mut total_bytes = 0;
let mut buffer = String::with_capacity(self.buffer_size);
for line_result in reader.lines() {
let line = line_result
.map_err(|e| crate::error::RepairError::Generic(format!("IO error: {}", e)))?;
buffer.push_str(&line);
buffer.push('\n');
if buffer.len() >= self.buffer_size {
let repaired = self.repair_chunk(&buffer, format)?;
writer.write_all(repaired.as_bytes()).map_err(|e| {
crate::error::RepairError::Generic(format!("Write error: {}", e))
})?;
total_bytes += repaired.len();
buffer.clear();
}
}
if !buffer.is_empty() {
let repaired = self.repair_chunk(&buffer, format)?;
writer
.write_all(repaired.as_bytes())
.map_err(|e| crate::error::RepairError::Generic(format!("Write error: {}", e)))?;
total_bytes += repaired.len();
}
Ok(total_bytes)
}
fn repair_chunk(&self, chunk: &str, format: &str) -> Result<String> {
let normalized = crate::normalize_format(format);
if normalized == "auto" || crate::create_repairer(normalized).is_err() {
crate::repair(chunk)
} else {
crate::repair_with_format(chunk, normalized)
}
}
}
impl Default for StreamingRepair {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_streaming_json_repair() {
let input = r#"{"name": "John",
"age": 30,
"city": "NYC",}"#;
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::new();
let result = processor.process(reader, &mut output, "json");
assert!(result.is_ok());
let output_str = String::from_utf8(output).unwrap();
assert!(output_str.contains("\"name\""));
assert!(output_str.contains("\"age\""));
}
#[test]
fn test_streaming_yaml_repair() {
let input = "name: John\nage: 30\ncity: NYC";
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::new();
let result = processor.process(reader, &mut output, "yaml");
assert!(result.is_ok());
let output_str = String::from_utf8(output).unwrap();
assert!(output_str.contains("name"));
}
#[test]
fn test_streaming_with_custom_buffer() {
let input = r#"{"key": "value",}"#;
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::with_buffer_size(256);
let result = processor.process(reader, &mut output, "json");
assert!(result.is_ok());
assert!(result.unwrap() > 0);
}
#[test]
fn test_streaming_large_file_simulation() {
let mut input = String::new();
for i in 0..100 {
input.push_str(&format!(r#"{{"id": {}, "value": "item",}}"#, i));
input.push('\n');
}
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::with_buffer_size(512);
let result = processor.process(reader, &mut output, "json");
assert!(result.is_ok());
assert!(result.unwrap() > 0);
}
#[test]
fn test_streaming_markdown_repair() {
let input = "# Header\n\nSome content\n\n## Subheader";
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::new();
let result = processor.process(reader, &mut output, "markdown");
assert!(result.is_ok());
let output_str = String::from_utf8(output).unwrap();
assert!(output_str.contains("Header"));
}
#[test]
fn test_streaming_auto_detect() {
let input = r#"{"test": "value",}"#;
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::new();
let result = processor.process(reader, &mut output, "auto");
assert!(result.is_ok());
}
#[test]
fn test_streaming_empty_input() {
let input = "";
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::new();
let result = processor.process(reader, &mut output, "json");
assert!(result.is_ok());
assert_eq!(result.unwrap(), 0);
}
#[test]
fn test_streaming_csv_repair() {
let input = "name,age,city\nJohn,30,NYC\nJane,25,LA";
let reader = Cursor::new(input);
let mut output = Vec::new();
let processor = StreamingRepair::new();
let result = processor.process(reader, &mut output, "csv");
assert!(result.is_ok());
let output_str = String::from_utf8(output).unwrap();
assert!(output_str.contains("name"));
}
}