pub fn process_ndjson_chunked<R: BufRead, W: Write>(
view_definition: SofViewDefinition,
input: R,
output: W,
content_type: ContentType,
config: ChunkConfig,
) -> Result<ProcessingStats, SofError>Expand description
Process an NDJSON input stream and write output incrementally.
This is the main entry point for streaming/chunked NDJSON processing. It reads the input in chunks, processes each chunk through the ViewDefinition, and writes the output incrementally to the writer.
§Arguments
view_definition- The ViewDefinition to executeinput- A buffered reader for the NDJSON inputoutput- A writer for the output (file, stdout, etc.)content_type- The desired output format (CSV, NDJSON, JSON)config- Configuration for chunk processing
§Returns
Statistics about the processing run, including row counts and chunk counts.
§Examples
use helios_sof::{process_ndjson_chunked, SofViewDefinition, ContentType, ChunkConfig};
use std::io::{BufReader, BufWriter};
use std::fs::File;
// Set up ViewDefinition
let view_json: serde_json::Value = serde_json::from_str(r#"{
"resourceType": "ViewDefinition",
"resource": "Patient",
"select": [{"column": [{"name": "id", "path": "id"}]}]
}"#).unwrap();
let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
let sof_view = SofViewDefinition::R4(view_def);
// Process file
let input = BufReader::new(File::open("patients.ndjson").unwrap());
let mut output = BufWriter::new(File::create("output.csv").unwrap());
let stats = process_ndjson_chunked(
sof_view,
input,
&mut output,
ContentType::CsvWithHeader,
ChunkConfig::default(),
).unwrap();
println!("Processed {} resources, {} output rows",
stats.resources_processed, stats.output_rows);§Errors
Returns an error if:
- The ViewDefinition is invalid
- The input contains invalid JSON (when
skip_invalid_linesis false) - Writing to the output fails
- Parquet format is requested (not supported for streaming)