Skip to main content

process_ndjson_chunked

Function process_ndjson_chunked 

Source
pub fn process_ndjson_chunked<R: BufRead, W: Write>(
    view_definition: SofViewDefinition,
    input: R,
    output: W,
    content_type: ContentType,
    config: ChunkConfig,
) -> Result<ProcessingStats, SofError>
Expand description

Process an NDJSON input stream and write output incrementally.

This is the main entry point for streaming/chunked NDJSON processing. It reads the input in chunks, processes each chunk through the ViewDefinition, and writes the output incrementally to the writer.

§Arguments

  • view_definition - The ViewDefinition to execute
  • input - A buffered reader for the NDJSON input
  • output - A writer for the output (file, stdout, etc.)
  • content_type - The desired output format (CSV, NDJSON, JSON)
  • config - Configuration for chunk processing

§Returns

Statistics about the processing run, including row counts and chunk counts.

§Examples

use helios_sof::{process_ndjson_chunked, SofViewDefinition, ContentType, ChunkConfig};
use std::io::{BufReader, BufWriter};
use std::fs::File;

// Set up ViewDefinition
let view_json: serde_json::Value = serde_json::from_str(r#"{
    "resourceType": "ViewDefinition",
    "resource": "Patient",
    "select": [{"column": [{"name": "id", "path": "id"}]}]
}"#).unwrap();
let view_def: helios_fhir::r4::ViewDefinition = serde_json::from_value(view_json).unwrap();
let sof_view = SofViewDefinition::R4(view_def);

// Process file
let input = BufReader::new(File::open("patients.ndjson").unwrap());
let mut output = BufWriter::new(File::create("output.csv").unwrap());

let stats = process_ndjson_chunked(
    sof_view,
    input,
    &mut output,
    ContentType::CsvWithHeader,
    ChunkConfig::default(),
).unwrap();

println!("Processed {} resources, {} output rows",
    stats.resources_processed, stats.output_rows);

§Errors

Returns an error if:

  • The ViewDefinition is invalid
  • The input contains invalid JSON (when skip_invalid_lines is false)
  • Writing to the output fails
  • Parquet format is requested (not supported for streaming)