pub mod data_source;
pub mod parquet_schema;
pub mod traits;
use chrono::{DateTime, Utc};
use helios_fhirpath::{EvaluationContext, EvaluationResult, evaluate_expression};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{BufRead, Write};
use thiserror::Error;
use traits::*;
pub use helios_fhir::FhirVersion;
pub use traits::{BundleTrait, ResourceTrait, ViewDefinitionTrait};
#[derive(Debug, Clone)]
pub enum SofViewDefinition {
#[cfg(feature = "R4")]
R4(helios_fhir::r4::ViewDefinition),
#[cfg(feature = "R4B")]
R4B(helios_fhir::r4b::ViewDefinition),
#[cfg(feature = "R5")]
R5(helios_fhir::r5::ViewDefinition),
#[cfg(feature = "R6")]
R6(helios_fhir::r6::ViewDefinition),
}
impl SofViewDefinition {
pub fn version(&self) -> helios_fhir::FhirVersion {
match self {
#[cfg(feature = "R4")]
SofViewDefinition::R4(_) => helios_fhir::FhirVersion::R4,
#[cfg(feature = "R4B")]
SofViewDefinition::R4B(_) => helios_fhir::FhirVersion::R4B,
#[cfg(feature = "R5")]
SofViewDefinition::R5(_) => helios_fhir::FhirVersion::R5,
#[cfg(feature = "R6")]
SofViewDefinition::R6(_) => helios_fhir::FhirVersion::R6,
}
}
}
#[derive(Debug, Clone)]
pub enum SofBundle {
#[cfg(feature = "R4")]
R4(helios_fhir::r4::Bundle),
#[cfg(feature = "R4B")]
R4B(helios_fhir::r4b::Bundle),
#[cfg(feature = "R5")]
R5(helios_fhir::r5::Bundle),
#[cfg(feature = "R6")]
R6(helios_fhir::r6::Bundle),
}
impl SofBundle {
pub fn version(&self) -> helios_fhir::FhirVersion {
match self {
#[cfg(feature = "R4")]
SofBundle::R4(_) => helios_fhir::FhirVersion::R4,
#[cfg(feature = "R4B")]
SofBundle::R4B(_) => helios_fhir::FhirVersion::R4B,
#[cfg(feature = "R5")]
SofBundle::R5(_) => helios_fhir::FhirVersion::R5,
#[cfg(feature = "R6")]
SofBundle::R6(_) => helios_fhir::FhirVersion::R6,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum SofCapabilityStatement {
#[cfg(feature = "R4")]
R4(helios_fhir::r4::CapabilityStatement),
#[cfg(feature = "R4B")]
R4B(helios_fhir::r4b::CapabilityStatement),
#[cfg(feature = "R5")]
R5(helios_fhir::r5::CapabilityStatement),
#[cfg(feature = "R6")]
R6(helios_fhir::r6::CapabilityStatement),
}
impl SofCapabilityStatement {
pub fn version(&self) -> helios_fhir::FhirVersion {
match self {
#[cfg(feature = "R4")]
SofCapabilityStatement::R4(_) => helios_fhir::FhirVersion::R4,
#[cfg(feature = "R4B")]
SofCapabilityStatement::R4B(_) => helios_fhir::FhirVersion::R4B,
#[cfg(feature = "R5")]
SofCapabilityStatement::R5(_) => helios_fhir::FhirVersion::R5,
#[cfg(feature = "R6")]
SofCapabilityStatement::R6(_) => helios_fhir::FhirVersion::R6,
}
}
}
pub type SofParameters = helios_fhir::VersionIndependentParameters;
#[derive(Debug, Error)]
pub enum SofError {
#[error("Invalid ViewDefinition: {0}")]
InvalidViewDefinition(String),
#[error("FHIRPath evaluation error: {0}")]
FhirPathError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
#[error("CSV error: {0}")]
CsvError(#[from] csv::Error),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Unsupported content type: {0}")]
UnsupportedContentType(String),
#[error("CSV writer error: {0}")]
CsvWriterError(String),
#[error("Invalid source: {0}")]
InvalidSource(String),
#[error("Source not found: {0}")]
SourceNotFound(String),
#[error("Failed to fetch source: {0}")]
SourceFetchError(String),
#[error("Failed to read source: {0}")]
SourceReadError(String),
#[error("Invalid source content: {0}")]
InvalidSourceContent(String),
#[error("Unsupported source protocol: {0}")]
UnsupportedSourceProtocol(String),
#[error("Parquet conversion error: {0}")]
ParquetConversionError(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContentType {
Csv,
CsvWithHeader,
Json,
NdJson,
Parquet,
}
impl ContentType {
pub fn from_string(s: &str) -> Result<Self, SofError> {
match s {
"csv" => Ok(ContentType::CsvWithHeader),
"json" => Ok(ContentType::Json),
"ndjson" => Ok(ContentType::NdJson),
"parquet" => Ok(ContentType::Parquet),
"text/csv;header=false" => Ok(ContentType::Csv),
"text/csv" | "text/csv;header=true" => Ok(ContentType::CsvWithHeader),
"application/json" => Ok(ContentType::Json),
"application/ndjson" | "application/x-ndjson" => Ok(ContentType::NdJson),
"application/parquet" => Ok(ContentType::Parquet),
_ => Err(SofError::UnsupportedContentType(s.to_string())),
}
}
}
pub fn get_fhir_version_string() -> &'static str {
let newest_version = get_newest_enabled_fhir_version();
match newest_version {
#[cfg(feature = "R4")]
helios_fhir::FhirVersion::R4 => "4.0.1",
#[cfg(feature = "R4B")]
helios_fhir::FhirVersion::R4B => "4.3.0",
#[cfg(feature = "R5")]
helios_fhir::FhirVersion::R5 => "5.0.0",
#[cfg(feature = "R6")]
helios_fhir::FhirVersion::R6 => "6.0.0",
}
}
pub fn get_newest_enabled_fhir_version() -> helios_fhir::FhirVersion {
#[cfg(feature = "R6")]
return helios_fhir::FhirVersion::R6;
#[cfg(all(feature = "R5", not(feature = "R6")))]
return helios_fhir::FhirVersion::R5;
#[cfg(all(feature = "R4B", not(feature = "R5"), not(feature = "R6")))]
return helios_fhir::FhirVersion::R4B;
#[cfg(all(
feature = "R4",
not(feature = "R4B"),
not(feature = "R5"),
not(feature = "R6")
))]
return helios_fhir::FhirVersion::R4;
#[cfg(not(any(feature = "R4", feature = "R4B", feature = "R5", feature = "R6")))]
panic!("At least one FHIR version feature must be enabled");
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessedRow {
pub values: Vec<Option<serde_json::Value>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessedResult {
pub columns: Vec<String>,
pub rows: Vec<ProcessedRow>,
}
pub fn run_view_definition(
view_definition: SofViewDefinition,
bundle: SofBundle,
content_type: ContentType,
) -> Result<Vec<u8>, SofError> {
run_view_definition_with_options(view_definition, bundle, content_type, RunOptions::default())
}
#[derive(Debug, Clone)]
pub struct ParquetOptions {
pub row_group_size_mb: u32,
pub page_size_kb: u32,
pub compression: String,
pub max_file_size_mb: Option<u32>,
}
impl Default for ParquetOptions {
fn default() -> Self {
Self {
row_group_size_mb: 256,
page_size_kb: 1024,
compression: "snappy".to_string(),
max_file_size_mb: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RunOptions {
pub since: Option<DateTime<Utc>>,
pub limit: Option<usize>,
pub page: Option<usize>,
pub parquet_options: Option<ParquetOptions>,
}
#[derive(Debug, Clone)]
pub struct ChunkConfig {
pub chunk_size: usize,
pub skip_invalid_lines: bool,
}
impl Default for ChunkConfig {
fn default() -> Self {
Self {
chunk_size: 1000,
skip_invalid_lines: false,
}
}
}
#[derive(Debug)]
pub struct ResourceChunk {
pub resources: Vec<serde_json::Value>,
pub chunk_index: usize,
pub is_last: bool,
}
#[derive(Debug, Clone)]
pub struct ChunkedResult {
pub columns: Vec<String>,
pub rows: Vec<ProcessedRow>,
pub chunk_index: usize,
pub is_last: bool,
pub resources_in_chunk: usize,
}
#[derive(Debug, Clone, Default)]
pub struct ProcessingStats {
pub total_lines_read: usize,
pub resources_processed: usize,
pub output_rows: usize,
pub skipped_lines: usize,
pub chunks_processed: usize,
}
pub struct NdjsonChunkReader<R: BufRead> {
reader: R,
config: ChunkConfig,
current_chunk: usize,
finished: bool,
line_buffer: String,
line_number: usize,
resource_type_filter: Option<String>,
skipped_lines: usize,
}
impl<R: BufRead> NdjsonChunkReader<R> {
pub fn new(reader: R, config: ChunkConfig) -> Self {
Self {
reader,
config,
current_chunk: 0,
finished: false,
line_buffer: String::new(),
line_number: 0,
resource_type_filter: None,
skipped_lines: 0,
}
}
pub fn with_resource_type_filter(mut self, resource_type: Option<String>) -> Self {
self.resource_type_filter = resource_type;
self
}
pub fn lines_read(&self) -> usize {
self.line_number
}
pub fn skipped_lines(&self) -> usize {
self.skipped_lines
}
}
impl<R: BufRead> Iterator for NdjsonChunkReader<R> {
type Item = Result<ResourceChunk, SofError>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
let mut resources = Vec::with_capacity(self.config.chunk_size);
while resources.len() < self.config.chunk_size {
self.line_buffer.clear();
match self.reader.read_line(&mut self.line_buffer) {
Ok(0) => {
self.finished = true;
break;
}
Ok(_) => {
self.line_number += 1;
let line = self.line_buffer.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<serde_json::Value>(line) {
Ok(value) => {
if let Some(ref filter) = self.resource_type_filter {
let resource_type =
value.get("resourceType").and_then(|v| v.as_str());
if resource_type != Some(filter.as_str()) {
continue;
}
}
resources.push(value);
}
Err(e) => {
if self.config.skip_invalid_lines {
self.skipped_lines += 1;
continue;
} else {
return Some(Err(SofError::InvalidSourceContent(format!(
"Invalid JSON at line {}: {}",
self.line_number, e
))));
}
}
}
}
Err(e) => {
return Some(Err(SofError::IoError(e)));
}
}
}
if resources.is_empty() && self.finished {
return None;
}
let chunk = ResourceChunk {
resources,
chunk_index: self.current_chunk,
is_last: self.finished,
};
self.current_chunk += 1;
Some(Ok(chunk))
}
}
#[derive(Debug, Clone)]
pub struct PreparedViewDefinition {
view_definition: SofViewDefinition,
target_resource_type: String,
variables: HashMap<String, EvaluationResult>,
column_names: Vec<String>,
}
impl PreparedViewDefinition {
pub fn new(view_definition: SofViewDefinition) -> Result<Self, SofError> {
let (target_resource_type, variables, column_names) = match &view_definition {
#[cfg(feature = "R4")]
SofViewDefinition::R4(vd) => {
validate_view_definition(vd)?;
let vars = extract_view_definition_constants(vd)?;
let resource_type = vd
.resource()
.ok_or_else(|| {
SofError::InvalidViewDefinition("Resource type is required".to_string())
})?
.to_string();
let mut columns = Vec::new();
if let Some(selects) = vd.select() {
collect_all_columns(selects, &mut columns)?;
}
(resource_type, vars, columns)
}
#[cfg(feature = "R4B")]
SofViewDefinition::R4B(vd) => {
validate_view_definition(vd)?;
let vars = extract_view_definition_constants(vd)?;
let resource_type = vd
.resource()
.ok_or_else(|| {
SofError::InvalidViewDefinition("Resource type is required".to_string())
})?
.to_string();
let mut columns = Vec::new();
if let Some(selects) = vd.select() {
collect_all_columns(selects, &mut columns)?;
}
(resource_type, vars, columns)
}
#[cfg(feature = "R5")]
SofViewDefinition::R5(vd) => {
validate_view_definition(vd)?;
let vars = extract_view_definition_constants(vd)?;
let resource_type = vd
.resource()
.ok_or_else(|| {
SofError::InvalidViewDefinition("Resource type is required".to_string())
})?
.to_string();
let mut columns = Vec::new();
if let Some(selects) = vd.select() {
collect_all_columns(selects, &mut columns)?;
}
(resource_type, vars, columns)
}
#[cfg(feature = "R6")]
SofViewDefinition::R6(vd) => {
validate_view_definition(vd)?;
let vars = extract_view_definition_constants(vd)?;
let resource_type = vd
.resource()
.ok_or_else(|| {
SofError::InvalidViewDefinition("Resource type is required".to_string())
})?
.to_string();
let mut columns = Vec::new();
if let Some(selects) = vd.select() {
collect_all_columns(selects, &mut columns)?;
}
(resource_type, vars, columns)
}
};
Ok(Self {
view_definition,
target_resource_type,
variables,
column_names,
})
}
pub fn columns(&self) -> &[String] {
&self.column_names
}
pub fn target_resource_type(&self) -> &str {
&self.target_resource_type
}
pub fn process_chunk(&self, chunk: ResourceChunk) -> Result<ChunkedResult, SofError> {
let results: Result<Vec<Vec<ProcessedRow>>, SofError> = chunk
.resources
.par_iter()
.filter_map(|resource_json| {
let resource_type = resource_json
.get("resourceType")
.and_then(|v| v.as_str())
.unwrap_or("");
if resource_type != self.target_resource_type {
None
} else {
Some(self.process_single_resource(resource_json))
}
})
.collect();
let all_rows: Vec<ProcessedRow> = results?.into_iter().flatten().collect();
Ok(ChunkedResult {
columns: self.column_names.clone(),
rows: all_rows,
chunk_index: chunk.chunk_index,
is_last: chunk.is_last,
resources_in_chunk: chunk.resources.len(),
})
}
fn process_single_resource(
&self,
resource_json: &serde_json::Value,
) -> Result<Vec<ProcessedRow>, SofError> {
match &self.view_definition {
#[cfg(feature = "R4")]
SofViewDefinition::R4(vd) => self.process_single_resource_generic(vd, resource_json),
#[cfg(feature = "R4B")]
SofViewDefinition::R4B(vd) => self.process_single_resource_generic(vd, resource_json),
#[cfg(feature = "R5")]
SofViewDefinition::R5(vd) => self.process_single_resource_generic(vd, resource_json),
#[cfg(feature = "R6")]
SofViewDefinition::R6(vd) => self.process_single_resource_generic(vd, resource_json),
}
}
fn process_single_resource_generic<VD>(
&self,
view_definition: &VD,
resource_json: &serde_json::Value,
) -> Result<Vec<ProcessedRow>, SofError>
where
VD: ViewDefinitionTrait,
VD::Select: ViewDefinitionSelectTrait,
{
let fhir_resource =
parse_json_to_fhir_resource(resource_json.clone(), self.view_definition.version())?;
let mut context = EvaluationContext::new(vec![fhir_resource]);
for (name, value) in &self.variables {
context.set_variable_result(name, value.clone());
}
if let Some(where_clauses) = view_definition.where_clauses() {
for where_clause in where_clauses {
let path = where_clause.path().ok_or_else(|| {
SofError::InvalidViewDefinition("Where clause path is required".to_string())
})?;
match evaluate_expression(path, &context) {
Ok(result) => {
if !can_be_coerced_to_boolean(&result) {
return Err(SofError::InvalidViewDefinition(format!(
"Where clause path '{}' returns type '{}' which cannot be used as a boolean condition.",
path,
result.type_name()
)));
}
if !is_truthy(&result) {
return Ok(Vec::new());
}
}
Err(e) => {
return Err(SofError::FhirPathError(format!(
"Error evaluating where clause '{}': {}",
path, e
)));
}
}
}
}
let select_clauses = view_definition.select().ok_or_else(|| {
SofError::InvalidViewDefinition("At least one select clause is required".to_string())
})?;
let mut all_columns = self.column_names.clone();
generate_row_combinations(&context, select_clauses, &mut all_columns, &self.variables)
}
}
pub struct NdjsonChunkIterator<R: BufRead> {
reader: NdjsonChunkReader<R>,
prepared_vd: PreparedViewDefinition,
}
impl<R: BufRead> NdjsonChunkIterator<R> {
pub fn new(
view_definition: SofViewDefinition,
reader: R,
config: ChunkConfig,
) -> Result<Self, SofError> {
let prepared_vd = PreparedViewDefinition::new(view_definition)?;
let resource_type = prepared_vd.target_resource_type().to_string();
let chunk_reader =
NdjsonChunkReader::new(reader, config).with_resource_type_filter(Some(resource_type));
Ok(Self {
reader: chunk_reader,
prepared_vd,
})
}
pub fn columns(&self) -> &[String] {
self.prepared_vd.columns()
}
pub fn lines_read(&self) -> usize {
self.reader.lines_read()
}
pub fn skipped_lines(&self) -> usize {
self.reader.skipped_lines()
}
}
impl<R: BufRead> Iterator for NdjsonChunkIterator<R> {
type Item = Result<ChunkedResult, SofError>;
fn next(&mut self) -> Option<Self::Item> {
match self.reader.next()? {
Ok(chunk) => Some(self.prepared_vd.process_chunk(chunk)),
Err(e) => Some(Err(e)),
}
}
}
fn write_csv_header<W: Write>(columns: &[String], writer: &mut W) -> Result<(), SofError> {
let mut wtr = csv::Writer::from_writer(writer);
wtr.write_record(columns)?;
wtr.flush()?;
Ok(())
}
fn write_csv_chunk<W: Write>(result: &ChunkedResult, writer: &mut W) -> Result<(), SofError> {
let mut wtr = csv::Writer::from_writer(writer);
for row in &result.rows {
let record: Vec<String> = row
.values
.iter()
.map(|v| match v {
Some(val) => {
if let serde_json::Value::String(s) = val {
s.clone()
} else {
serde_json::to_string(val).unwrap_or_default()
}
}
None => String::new(),
})
.collect();
wtr.write_record(&record)?;
}
wtr.flush()?;
Ok(())
}
fn write_ndjson_chunk<W: Write>(result: &ChunkedResult, writer: &mut W) -> Result<(), SofError> {
for row in &result.rows {
let mut row_obj = serde_json::Map::new();
for (i, column) in result.columns.iter().enumerate() {
let value = row
.values
.get(i)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or(serde_json::Value::Null);
row_obj.insert(column.clone(), value);
}
let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
writer.write_all(line.as_bytes())?;
writer.write_all(b"\n")?;
}
Ok(())
}
pub fn process_ndjson_chunked<R: BufRead, W: Write>(
view_definition: SofViewDefinition,
input: R,
mut output: W,
content_type: ContentType,
config: ChunkConfig,
) -> Result<ProcessingStats, SofError> {
if content_type == ContentType::Parquet {
return Err(SofError::UnsupportedContentType(
"Parquet output is not supported for streaming. Use batch processing instead."
.to_string(),
));
}
let mut iterator = NdjsonChunkIterator::new(view_definition, input, config)?;
let columns = iterator.columns().to_vec();
let mut stats = ProcessingStats::default();
let mut is_first_chunk = true;
if content_type == ContentType::CsvWithHeader {
write_csv_header(&columns, &mut output)?;
}
if content_type == ContentType::Json {
output.write_all(b"[\n")?;
}
for result in iterator.by_ref() {
let chunk_result = result?;
stats.resources_processed += chunk_result.resources_in_chunk;
stats.output_rows += chunk_result.rows.len();
stats.chunks_processed += 1;
match content_type {
ContentType::Csv | ContentType::CsvWithHeader => {
write_csv_chunk(&chunk_result, &mut output)?;
}
ContentType::NdJson => {
write_ndjson_chunk(&chunk_result, &mut output)?;
}
ContentType::Json => {
for (i, row) in chunk_result.rows.iter().enumerate() {
if !is_first_chunk || i > 0 {
output.write_all(b",\n")?;
}
let mut row_obj = serde_json::Map::new();
for (j, column) in chunk_result.columns.iter().enumerate() {
let value = row
.values
.get(j)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or(serde_json::Value::Null);
row_obj.insert(column.clone(), value);
}
let json = serde_json::to_string_pretty(&serde_json::Value::Object(row_obj))?;
output.write_all(json.as_bytes())?;
}
}
ContentType::Parquet => unreachable!(), }
output.flush()?;
is_first_chunk = false;
}
if content_type == ContentType::Json {
output.write_all(b"\n]")?;
}
output.flush()?;
stats.total_lines_read = iterator.lines_read();
stats.skipped_lines = iterator.skipped_lines();
Ok(stats)
}
pub fn iter_ndjson_chunks<R: BufRead>(
view_definition: SofViewDefinition,
reader: R,
config: ChunkConfig,
) -> Result<NdjsonChunkIterator<R>, SofError> {
NdjsonChunkIterator::new(view_definition, reader, config)
}
fn parse_json_to_fhir_resource(
json: serde_json::Value,
version: FhirVersion,
) -> Result<helios_fhir::FhirResource, SofError> {
match version {
#[cfg(feature = "R4")]
FhirVersion::R4 => {
let resource: helios_fhir::r4::Resource =
serde_json::from_value(json).map_err(|e| {
SofError::InvalidSourceContent(format!("Invalid R4 resource: {}", e))
})?;
Ok(helios_fhir::FhirResource::R4(Box::new(resource)))
}
#[cfg(feature = "R4B")]
FhirVersion::R4B => {
let resource: helios_fhir::r4b::Resource =
serde_json::from_value(json).map_err(|e| {
SofError::InvalidSourceContent(format!("Invalid R4B resource: {}", e))
})?;
Ok(helios_fhir::FhirResource::R4B(Box::new(resource)))
}
#[cfg(feature = "R5")]
FhirVersion::R5 => {
let resource: helios_fhir::r5::Resource =
serde_json::from_value(json).map_err(|e| {
SofError::InvalidSourceContent(format!("Invalid R5 resource: {}", e))
})?;
Ok(helios_fhir::FhirResource::R5(Box::new(resource)))
}
#[cfg(feature = "R6")]
FhirVersion::R6 => {
let resource: helios_fhir::r6::Resource =
serde_json::from_value(json).map_err(|e| {
SofError::InvalidSourceContent(format!("Invalid R6 resource: {}", e))
})?;
Ok(helios_fhir::FhirResource::R6(Box::new(resource)))
}
}
}
pub fn run_view_definition_with_options(
view_definition: SofViewDefinition,
bundle: SofBundle,
content_type: ContentType,
options: RunOptions,
) -> Result<Vec<u8>, SofError> {
let filtered_bundle = if let Some(since) = options.since {
filter_bundle_by_since(bundle, since)?
} else {
bundle
};
let processed_result = process_view_definition(view_definition, filtered_bundle)?;
let processed_result = if options.limit.is_some() || options.page.is_some() {
apply_pagination_to_result(processed_result, options.limit, options.page)?
} else {
processed_result
};
format_output(
processed_result,
content_type,
options.parquet_options.as_ref(),
)
}
pub fn process_view_definition(
view_definition: SofViewDefinition,
bundle: SofBundle,
) -> Result<ProcessedResult, SofError> {
if view_definition.version() != bundle.version() {
return Err(SofError::InvalidViewDefinition(
"ViewDefinition and Bundle must use the same FHIR version".to_string(),
));
}
match (view_definition, bundle) {
#[cfg(feature = "R4")]
(SofViewDefinition::R4(vd), SofBundle::R4(bundle)) => {
process_view_definition_generic(vd, bundle)
}
#[cfg(feature = "R4B")]
(SofViewDefinition::R4B(vd), SofBundle::R4B(bundle)) => {
process_view_definition_generic(vd, bundle)
}
#[cfg(feature = "R5")]
(SofViewDefinition::R5(vd), SofBundle::R5(bundle)) => {
process_view_definition_generic(vd, bundle)
}
#[cfg(feature = "R6")]
(SofViewDefinition::R6(vd), SofBundle::R6(bundle)) => {
process_view_definition_generic(vd, bundle)
}
#[cfg(any(
all(feature = "R4", any(feature = "R4B", feature = "R5", feature = "R6")),
all(feature = "R4B", any(feature = "R5", feature = "R6")),
all(feature = "R5", feature = "R6")
))]
_ => {
unreachable!("Version mismatch should have been caught by the version check above")
}
}
}
fn extract_view_definition_constants<VD: ViewDefinitionTrait>(
view_definition: &VD,
) -> Result<HashMap<String, EvaluationResult>, SofError> {
let mut variables = HashMap::new();
if let Some(constants) = view_definition.constants() {
for constant in constants {
let name = constant
.name()
.ok_or_else(|| {
SofError::InvalidViewDefinition("Constant name is required".to_string())
})?
.to_string();
let eval_result = constant.to_evaluation_result()?;
variables.insert(format!("%{}", name), eval_result);
}
}
Ok(variables)
}
fn process_view_definition_generic<VD, B>(
view_definition: VD,
bundle: B,
) -> Result<ProcessedResult, SofError>
where
VD: ViewDefinitionTrait,
B: BundleTrait,
B::Resource: ResourceTrait + Sync,
VD::Select: Sync,
{
validate_view_definition(&view_definition)?;
let variables = extract_view_definition_constants(&view_definition)?;
let target_resource_type = view_definition
.resource()
.ok_or_else(|| SofError::InvalidViewDefinition("Resource type is required".to_string()))?;
let filtered_resources = filter_resources(&bundle, target_resource_type)?;
let filtered_resources = apply_where_clauses(
filtered_resources,
view_definition.where_clauses(),
&variables,
)?;
let select_clauses = view_definition.select().ok_or_else(|| {
SofError::InvalidViewDefinition("At least one select clause is required".to_string())
})?;
let (all_columns, rows) =
generate_rows_from_selects(&filtered_resources, select_clauses, &variables)?;
Ok(ProcessedResult {
columns: all_columns,
rows,
})
}
fn validate_view_definition<VD: ViewDefinitionTrait>(view_def: &VD) -> Result<(), SofError> {
if view_def.resource().is_none_or(|s| s.is_empty()) {
return Err(SofError::InvalidViewDefinition(
"ViewDefinition must specify a resource type".to_string(),
));
}
if view_def.select().is_none_or(|s| s.is_empty()) {
return Err(SofError::InvalidViewDefinition(
"ViewDefinition must have at least one select".to_string(),
));
}
if let Some(where_clauses) = view_def.where_clauses() {
validate_where_clauses(where_clauses)?;
}
if let Some(selects) = view_def.select() {
for select in selects {
validate_select(select)?;
}
}
Ok(())
}
fn validate_where_clauses<W: ViewDefinitionWhereTrait>(
where_clauses: &[W],
) -> Result<(), SofError> {
for where_clause in where_clauses {
if where_clause.path().is_none() {
return Err(SofError::InvalidViewDefinition(
"Where clause must have a path specified".to_string(),
));
}
}
Ok(())
}
fn can_be_coerced_to_boolean(result: &EvaluationResult) -> bool {
match result {
EvaluationResult::Boolean(_, _) => true,
EvaluationResult::Empty => true,
EvaluationResult::Collection { .. } => true,
_ => false,
}
}
fn validate_select<S: ViewDefinitionSelectTrait>(select: &S) -> Result<(), SofError> {
validate_select_with_context(select, false)
}
fn validate_select_with_context<S: ViewDefinitionSelectTrait>(
select: &S,
in_foreach_context: bool,
) -> Result<(), SofError>
where
S::Select: ViewDefinitionSelectTrait,
{
let entering_foreach = select.for_each().is_some() || select.for_each_or_null().is_some();
let current_foreach_context = in_foreach_context || entering_foreach;
if let Some(columns) = select.column() {
for column in columns {
if let Some(collection_value) = column.collection() {
if !collection_value && !current_foreach_context {
return Err(SofError::InvalidViewDefinition(
"Column 'collection' attribute must be true when specified".to_string(),
));
}
}
}
}
if let Some(union_selects) = select.union_all() {
validate_union_all_columns(union_selects)?;
}
if let Some(nested_selects) = select.select() {
for nested_select in nested_selects {
validate_select_with_context(nested_select, current_foreach_context)?;
}
}
if let Some(union_selects) = select.union_all() {
for union_select in union_selects {
validate_select_with_context(union_select, current_foreach_context)?;
}
}
Ok(())
}
fn validate_union_all_columns<S: ViewDefinitionSelectTrait>(
union_selects: &[S],
) -> Result<(), SofError> {
if union_selects.len() < 2 {
return Ok(());
}
let first_select = &union_selects[0];
let first_columns = get_column_names(first_select)?;
for (index, union_select) in union_selects.iter().enumerate().skip(1) {
let current_columns = get_column_names(union_select)?;
if current_columns != first_columns {
if current_columns.len() != first_columns.len()
|| !current_columns
.iter()
.all(|name| first_columns.contains(name))
{
return Err(SofError::InvalidViewDefinition(format!(
"UnionAll branch {} has different column names than first branch",
index
)));
} else {
return Err(SofError::InvalidViewDefinition(format!(
"UnionAll branch {} has columns in different order than first branch",
index
)));
}
}
}
Ok(())
}
fn get_column_names<S: ViewDefinitionSelectTrait>(select: &S) -> Result<Vec<String>, SofError> {
let mut column_names = Vec::new();
if let Some(columns) = select.column() {
for column in columns {
if let Some(name) = column.name() {
column_names.push(name.to_string());
}
}
}
if column_names.is_empty() {
if let Some(union_selects) = select.union_all() {
if !union_selects.is_empty() {
return get_column_names(&union_selects[0]);
}
}
}
Ok(column_names)
}
fn filter_resources<'a, B: BundleTrait>(
bundle: &'a B,
resource_type: &str,
) -> Result<Vec<&'a B::Resource>, SofError> {
Ok(bundle
.entries()
.into_iter()
.filter(|resource| resource.resource_name() == resource_type)
.collect())
}
fn apply_where_clauses<'a, R, W>(
resources: Vec<&'a R>,
where_clauses: Option<&[W]>,
variables: &HashMap<String, EvaluationResult>,
) -> Result<Vec<&'a R>, SofError>
where
R: ResourceTrait,
W: ViewDefinitionWhereTrait,
{
if let Some(wheres) = where_clauses {
let mut filtered = Vec::new();
for resource in resources {
let mut include_resource = true;
for where_clause in wheres {
let fhir_resource = resource.to_fhir_resource();
let mut context = EvaluationContext::new(vec![fhir_resource]);
for (name, value) in variables {
context.set_variable_result(name, value.clone());
}
let path = where_clause.path().ok_or_else(|| {
SofError::InvalidViewDefinition("Where clause path is required".to_string())
})?;
match evaluate_expression(path, &context) {
Ok(result) => {
if !can_be_coerced_to_boolean(&result) {
return Err(SofError::InvalidViewDefinition(format!(
"Where clause path '{}' returns type '{}' which cannot be used as a boolean condition. \
Where clauses must return boolean values, collections, or empty results.",
path,
result.type_name()
)));
}
if !is_truthy(&result) {
include_resource = false;
break;
}
}
Err(e) => {
return Err(SofError::FhirPathError(format!(
"Error evaluating where clause '{}': {}",
path, e
)));
}
}
}
if include_resource {
filtered.push(resource);
}
}
Ok(filtered)
} else {
Ok(resources)
}
}
fn is_truthy(result: &EvaluationResult) -> bool {
match result {
EvaluationResult::Empty => false,
EvaluationResult::Boolean(b, _) => *b,
EvaluationResult::Collection { items, .. } => !items.is_empty(),
_ => true, }
}
fn fhirpath_result_to_json_value_collection(result: EvaluationResult) -> Option<serde_json::Value> {
match result {
EvaluationResult::Empty => Some(serde_json::Value::Array(vec![])),
EvaluationResult::Collection { items, .. } => {
let values: Vec<serde_json::Value> = items
.into_iter()
.filter_map(fhirpath_result_to_json_value)
.collect();
Some(serde_json::Value::Array(values))
}
single_result => {
if let Some(json_val) = fhirpath_result_to_json_value(single_result) {
Some(serde_json::Value::Array(vec![json_val]))
} else {
Some(serde_json::Value::Array(vec![]))
}
}
}
}
fn fhirpath_result_to_json_value(result: EvaluationResult) -> Option<serde_json::Value> {
match result {
EvaluationResult::Empty => None,
EvaluationResult::Boolean(b, _) => Some(serde_json::Value::Bool(b)),
EvaluationResult::Integer(i, _) => {
Some(serde_json::Value::Number(serde_json::Number::from(i)))
}
EvaluationResult::Decimal(d, _) => {
if d.fract().is_zero() {
if let Ok(i) = d.to_string().parse::<i64>() {
Some(serde_json::Value::Number(serde_json::Number::from(i)))
} else {
Some(serde_json::Value::String(d.to_string()))
}
} else {
if let Ok(f) = d.to_string().parse::<f64>() {
if let Some(num) = serde_json::Number::from_f64(f) {
Some(serde_json::Value::Number(num))
} else {
Some(serde_json::Value::String(d.to_string()))
}
} else {
Some(serde_json::Value::String(d.to_string()))
}
}
}
EvaluationResult::String(s, _) => Some(serde_json::Value::String(s)),
EvaluationResult::Date(s, _) => Some(serde_json::Value::String(s)),
EvaluationResult::DateTime(s, _) => {
let cleaned = s.strip_prefix("@").unwrap_or(&s);
Some(serde_json::Value::String(cleaned.to_string()))
}
EvaluationResult::Time(s, _) => {
let cleaned = s.strip_prefix("@T").unwrap_or(&s);
Some(serde_json::Value::String(cleaned.to_string()))
}
EvaluationResult::Collection { items, .. } => {
if items.len() == 1 {
fhirpath_result_to_json_value(items.into_iter().next().unwrap())
} else if items.is_empty() {
None
} else {
let values: Vec<serde_json::Value> = items
.into_iter()
.filter_map(fhirpath_result_to_json_value)
.collect();
Some(serde_json::Value::Array(values))
}
}
EvaluationResult::Object { map, .. } => {
let mut json_map = serde_json::Map::new();
for (k, v) in map {
if let Some(json_val) = fhirpath_result_to_json_value(v) {
json_map.insert(k, json_val);
}
}
Some(serde_json::Value::Object(json_map))
}
_ => Some(serde_json::Value::String(format!("{:?}", result))),
}
}
fn extract_iteration_items(result: EvaluationResult) -> Vec<EvaluationResult> {
match result {
EvaluationResult::Collection { items, .. } => items,
EvaluationResult::Empty => Vec::new(),
single_item => vec![single_item],
}
}
fn generate_rows_from_selects<R, S>(
resources: &[&R],
selects: &[S],
variables: &HashMap<String, EvaluationResult>,
) -> Result<(Vec<String>, Vec<ProcessedRow>), SofError>
where
R: ResourceTrait + Sync,
S: ViewDefinitionSelectTrait + Sync,
S::Select: ViewDefinitionSelectTrait,
{
let resource_results: Result<Vec<_>, _> = resources
.par_iter()
.map(|resource| {
let mut local_columns = Vec::new();
let resource_rows =
generate_rows_for_resource(*resource, selects, &mut local_columns, variables)?;
Ok::<(Vec<String>, Vec<ProcessedRow>), SofError>((local_columns, resource_rows))
})
.collect();
let resource_results = resource_results?;
let mut final_columns = Vec::new();
let mut all_rows = Vec::new();
for (local_columns, resource_rows) in resource_results {
for col in local_columns {
if !final_columns.contains(&col) {
final_columns.push(col);
}
}
all_rows.extend(resource_rows);
}
Ok((final_columns, all_rows))
}
fn generate_rows_for_resource<R, S>(
resource: &R,
selects: &[S],
all_columns: &mut Vec<String>,
variables: &HashMap<String, EvaluationResult>,
) -> Result<Vec<ProcessedRow>, SofError>
where
R: ResourceTrait,
S: ViewDefinitionSelectTrait,
S::Select: ViewDefinitionSelectTrait,
{
let fhir_resource = resource.to_fhir_resource();
let mut context = EvaluationContext::new(vec![fhir_resource]);
for (name, value) in variables {
context.set_variable_result(name, value.clone());
}
let row_combinations = generate_row_combinations(&context, selects, all_columns, variables)?;
Ok(row_combinations)
}
#[derive(Debug, Clone)]
struct RowCombination {
values: Vec<Option<serde_json::Value>>,
}
fn generate_row_combinations<S>(
context: &EvaluationContext,
selects: &[S],
all_columns: &mut Vec<String>,
variables: &HashMap<String, EvaluationResult>,
) -> Result<Vec<ProcessedRow>, SofError>
where
S: ViewDefinitionSelectTrait,
S::Select: ViewDefinitionSelectTrait,
{
collect_all_columns(selects, all_columns)?;
let mut row_combinations = vec![RowCombination {
values: vec![None; all_columns.len()],
}];
for select in selects {
row_combinations =
expand_select_combinations(context, select, &row_combinations, all_columns, variables)?;
}
Ok(row_combinations
.into_iter()
.map(|combo| ProcessedRow {
values: combo.values,
})
.collect())
}
fn collect_all_columns<S>(selects: &[S], all_columns: &mut Vec<String>) -> Result<(), SofError>
where
S: ViewDefinitionSelectTrait,
{
for select in selects {
if let Some(columns) = select.column() {
for col in columns {
if let Some(name) = col.name() {
if !all_columns.contains(&name.to_string()) {
all_columns.push(name.to_string());
}
}
}
}
if let Some(nested_selects) = select.select() {
collect_all_columns(nested_selects, all_columns)?;
}
if let Some(union_selects) = select.union_all() {
collect_all_columns(union_selects, all_columns)?;
}
}
Ok(())
}
fn expand_select_combinations<S>(
context: &EvaluationContext,
select: &S,
existing_combinations: &[RowCombination],
all_columns: &[String],
variables: &HashMap<String, EvaluationResult>,
) -> Result<Vec<RowCombination>, SofError>
where
S: ViewDefinitionSelectTrait,
S::Select: ViewDefinitionSelectTrait,
{
if let Some(for_each_path) = select.for_each() {
return expand_for_each_combinations(
context,
select,
existing_combinations,
all_columns,
for_each_path,
false,
variables,
);
}
if let Some(for_each_or_null_path) = select.for_each_or_null() {
return expand_for_each_combinations(
context,
select,
existing_combinations,
all_columns,
for_each_or_null_path,
true,
variables,
);
}
if let Some(repeat_paths) = select.repeat() {
return expand_repeat_combinations(
context,
select,
existing_combinations,
all_columns,
&repeat_paths,
variables,
);
}
let mut new_combinations = Vec::new();
for existing_combo in existing_combinations {
let mut new_combo = existing_combo.clone();
if let Some(columns) = select.column() {
for col in columns {
if let Some(col_name) = col.name() {
if let Some(col_index) = all_columns.iter().position(|name| name == col_name) {
let path = col.path().ok_or_else(|| {
SofError::InvalidViewDefinition("Column path is required".to_string())
})?;
match evaluate_expression(path, context) {
Ok(result) => {
let is_collection = col.collection().unwrap_or(false);
new_combo.values[col_index] = if is_collection {
fhirpath_result_to_json_value_collection(result)
} else {
fhirpath_result_to_json_value(result)
};
}
Err(e) => {
return Err(SofError::FhirPathError(format!(
"Error evaluating column '{}' with path '{}': {}",
col_name, path, e
)));
}
}
}
}
}
}
new_combinations.push(new_combo);
}
if let Some(nested_selects) = select.select() {
for nested_select in nested_selects {
new_combinations = expand_select_combinations(
context,
nested_select,
&new_combinations,
all_columns,
variables,
)?;
}
}
if let Some(union_selects) = select.union_all() {
let mut union_combinations = Vec::new();
for union_select in union_selects {
let select_combinations = expand_select_combinations(
context,
union_select,
&new_combinations,
all_columns,
variables,
)?;
union_combinations.extend(select_combinations);
}
new_combinations = union_combinations;
}
Ok(new_combinations)
}
fn expand_for_each_combinations<S>(
context: &EvaluationContext,
select: &S,
existing_combinations: &[RowCombination],
all_columns: &[String],
for_each_path: &str,
allow_null: bool,
variables: &HashMap<String, EvaluationResult>,
) -> Result<Vec<RowCombination>, SofError>
where
S: ViewDefinitionSelectTrait,
S::Select: ViewDefinitionSelectTrait,
{
let for_each_result = evaluate_expression(for_each_path, context).map_err(|e| {
SofError::FhirPathError(format!(
"Error evaluating forEach expression '{}': {}",
for_each_path, e
))
})?;
let iteration_items = extract_iteration_items(for_each_result);
if iteration_items.is_empty() {
if allow_null {
let mut new_combinations = Vec::new();
for existing_combo in existing_combinations {
let mut new_combo = existing_combo.clone();
if let Some(columns) = select.column() {
for col in columns {
if let Some(col_name) = col.name() {
if let Some(col_index) =
all_columns.iter().position(|name| name == col_name)
{
new_combo.values[col_index] = None;
}
}
}
}
new_combinations.push(new_combo);
}
return Ok(new_combinations);
} else {
return Ok(Vec::new());
}
}
let mut new_combinations = Vec::new();
for item in &iteration_items {
let _item_context = create_iteration_context(item, variables);
for existing_combo in existing_combinations {
let mut new_combo = existing_combo.clone();
if let Some(columns) = select.column() {
for col in columns {
if let Some(col_name) = col.name() {
if let Some(col_index) =
all_columns.iter().position(|name| name == col_name)
{
let path = col.path().ok_or_else(|| {
SofError::InvalidViewDefinition(
"Column path is required".to_string(),
)
})?;
let result = if path == "$this" {
item.clone()
} else {
evaluate_path_on_item(path, item, variables)?
};
let is_collection = col.collection().unwrap_or(false);
new_combo.values[col_index] = if is_collection {
fhirpath_result_to_json_value_collection(result)
} else {
fhirpath_result_to_json_value(result)
};
}
}
}
}
new_combinations.push(new_combo);
}
}
if let Some(nested_selects) = select.select() {
let mut final_combinations = Vec::new();
for item in &iteration_items {
let item_context = create_iteration_context(item, variables);
for existing_combo in existing_combinations {
let mut base_combo = existing_combo.clone();
if let Some(columns) = select.column() {
for col in columns {
if let Some(col_name) = col.name() {
if let Some(col_index) =
all_columns.iter().position(|name| name == col_name)
{
let path = col.path().ok_or_else(|| {
SofError::InvalidViewDefinition(
"Column path is required".to_string(),
)
})?;
let result = if path == "$this" {
item.clone()
} else {
evaluate_path_on_item(path, item, variables)?
};
let is_collection = col.collection().unwrap_or(false);
base_combo.values[col_index] = if is_collection {
fhirpath_result_to_json_value_collection(result)
} else {
fhirpath_result_to_json_value(result)
};
}
}
}
}
let mut item_combinations = vec![base_combo];
for nested_select in nested_selects {
item_combinations = expand_select_combinations(
&item_context,
nested_select,
&item_combinations,
all_columns,
variables,
)?;
}
final_combinations.extend(item_combinations);
}
}
new_combinations = final_combinations;
}
if let Some(union_selects) = select.union_all() {
let mut union_combinations = Vec::new();
for item in &iteration_items {
let item_context = create_iteration_context(item, variables);
for existing_combo in existing_combinations {
let mut base_combo = existing_combo.clone();
if let Some(columns) = select.column() {
for col in columns {
if let Some(col_name) = col.name() {
if let Some(col_index) =
all_columns.iter().position(|name| name == col_name)
{
let path = col.path().ok_or_else(|| {
SofError::InvalidViewDefinition(
"Column path is required".to_string(),
)
})?;
let result = if path == "$this" {
item.clone()
} else {
evaluate_path_on_item(path, item, variables)?
};
let is_collection = col.collection().unwrap_or(false);
base_combo.values[col_index] = if is_collection {
fhirpath_result_to_json_value_collection(result)
} else {
fhirpath_result_to_json_value(result)
};
}
}
}
}
if let Some(nested_selects) = select.select() {
for nested_select in nested_selects {
if let Some(nested_columns) = nested_select.column() {
for col in nested_columns {
if let Some(col_name) = col.name() {
if let Some(col_index) =
all_columns.iter().position(|name| name == col_name)
{
let path = col.path().ok_or_else(|| {
SofError::InvalidViewDefinition(
"Column path is required".to_string(),
)
})?;
let result = if path == "$this" {
item.clone()
} else {
evaluate_path_on_item(path, item, variables)?
};
let is_collection = col.collection().unwrap_or(false);
base_combo.values[col_index] = if is_collection {
fhirpath_result_to_json_value_collection(result)
} else {
fhirpath_result_to_json_value(result)
};
}
}
}
}
}
}
for union_select in union_selects {
let mut select_combinations = vec![base_combo.clone()];
select_combinations = expand_select_combinations(
&item_context,
union_select,
&select_combinations,
all_columns,
variables,
)?;
union_combinations.extend(select_combinations);
}
}
}
new_combinations = union_combinations;
}
Ok(new_combinations)
}
fn expand_repeat_combinations<S>(
context: &EvaluationContext,
select: &S,
existing_combinations: &[RowCombination],
all_columns: &[String],
repeat_paths: &[&str],
variables: &HashMap<String, EvaluationResult>,
) -> Result<Vec<RowCombination>, SofError>
where
S: ViewDefinitionSelectTrait,
S::Select: ViewDefinitionSelectTrait,
{
let mut all_combinations = Vec::new();
for existing_combo in existing_combinations {
for repeat_path in repeat_paths {
let repeat_result = evaluate_expression(repeat_path, context).map_err(|e| {
SofError::FhirPathError(format!(
"Error evaluating repeat expression '{}': {}",
repeat_path, e
))
})?;
let child_items = extract_iteration_items(repeat_result);
for child_item in &child_items {
let mut child_combo = existing_combo.clone();
if let Some(columns) = select.column() {
for col in columns {
if let Some(col_name) = col.name() {
if let Some(col_index) =
all_columns.iter().position(|name| name == col_name)
{
let path = col.path().ok_or_else(|| {
SofError::InvalidViewDefinition(
"Column path is required".to_string(),
)
})?;
let result = if path == "$this" {
child_item.clone()
} else {
evaluate_path_on_item(path, child_item, variables)?
};
let is_collection = col.collection().unwrap_or(false);
child_combo.values[col_index] = if is_collection {
fhirpath_result_to_json_value_collection(result)
} else {
fhirpath_result_to_json_value(result)
};
}
}
}
}
let child_context = create_iteration_context(child_item, variables);
let mut child_combinations = vec![child_combo.clone()];
if let Some(nested_selects) = select.select() {
for nested_select in nested_selects {
child_combinations = expand_select_combinations(
&child_context,
nested_select,
&child_combinations,
all_columns,
variables,
)?;
}
}
all_combinations.extend(child_combinations);
let recursive_combinations = expand_repeat_combinations(
&child_context,
select,
&[child_combo],
all_columns,
repeat_paths,
variables,
)?;
all_combinations.extend(recursive_combinations);
}
}
}
Ok(all_combinations)
}
fn evaluate_path_on_item(
path: &str,
item: &EvaluationResult,
variables: &HashMap<String, EvaluationResult>,
) -> Result<EvaluationResult, SofError> {
let mut temp_context = match item {
EvaluationResult::Object { .. } => {
let mut context = EvaluationContext::new(vec![]);
context.this = Some(item.clone());
context
}
_ => EvaluationContext::new(vec![]),
};
for (name, value) in variables {
temp_context.set_variable_result(name, value.clone());
}
match evaluate_expression(path, &temp_context) {
Ok(result) => Ok(result),
Err(_e) => {
match item {
EvaluationResult::Object { map, .. } => {
if let Some(value) = map.get(path) {
Ok(value.clone())
} else {
Ok(EvaluationResult::Empty)
}
}
_ => Ok(EvaluationResult::Empty),
}
}
}
}
fn create_iteration_context(
item: &EvaluationResult,
variables: &HashMap<String, EvaluationResult>,
) -> EvaluationContext {
let mut context = EvaluationContext::new(vec![]);
context.this = Some(item.clone());
for (name, value) in variables {
context.set_variable_result(name, value.clone());
}
context
}
fn filter_bundle_by_since(bundle: SofBundle, since: DateTime<Utc>) -> Result<SofBundle, SofError> {
match bundle {
#[cfg(feature = "R4")]
SofBundle::R4(mut b) => {
if let Some(entries) = b.entry.as_mut() {
entries.retain(|entry| {
entry
.resource
.as_ref()
.and_then(|r| r.get_last_updated())
.map(|last_updated| last_updated > since)
.unwrap_or(false)
});
}
Ok(SofBundle::R4(b))
}
#[cfg(feature = "R4B")]
SofBundle::R4B(mut b) => {
if let Some(entries) = b.entry.as_mut() {
entries.retain(|entry| {
entry
.resource
.as_ref()
.and_then(|r| r.get_last_updated())
.map(|last_updated| last_updated > since)
.unwrap_or(false)
});
}
Ok(SofBundle::R4B(b))
}
#[cfg(feature = "R5")]
SofBundle::R5(mut b) => {
if let Some(entries) = b.entry.as_mut() {
entries.retain(|entry| {
entry
.resource
.as_ref()
.and_then(|r| r.get_last_updated())
.map(|last_updated| last_updated > since)
.unwrap_or(false)
});
}
Ok(SofBundle::R5(b))
}
#[cfg(feature = "R6")]
SofBundle::R6(mut b) => {
if let Some(entries) = b.entry.as_mut() {
entries.retain(|entry| {
entry
.resource
.as_ref()
.and_then(|r| r.get_last_updated())
.map(|last_updated| last_updated > since)
.unwrap_or(false)
});
}
Ok(SofBundle::R6(b))
}
}
}
fn apply_pagination_to_result(
mut result: ProcessedResult,
limit: Option<usize>,
page: Option<usize>,
) -> Result<ProcessedResult, SofError> {
if let Some(limit) = limit {
let page_num = page.unwrap_or(1);
if page_num == 0 {
return Err(SofError::InvalidViewDefinition(
"Page number must be greater than 0".to_string(),
));
}
let start_index = (page_num - 1) * limit;
if start_index >= result.rows.len() {
result.rows.clear();
} else {
let end_index = std::cmp::min(start_index + limit, result.rows.len());
result.rows = result.rows[start_index..end_index].to_vec();
}
}
Ok(result)
}
fn format_output(
result: ProcessedResult,
content_type: ContentType,
parquet_options: Option<&ParquetOptions>,
) -> Result<Vec<u8>, SofError> {
match content_type {
ContentType::Csv | ContentType::CsvWithHeader => {
format_csv(result, content_type == ContentType::CsvWithHeader)
}
ContentType::Json => format_json(result),
ContentType::NdJson => format_ndjson(result),
ContentType::Parquet => format_parquet(result, parquet_options),
}
}
fn format_csv(result: ProcessedResult, include_header: bool) -> Result<Vec<u8>, SofError> {
let mut wtr = csv::Writer::from_writer(vec![]);
if include_header {
wtr.write_record(&result.columns)?;
}
for row in result.rows {
let record: Vec<String> = row
.values
.iter()
.map(|v| match v {
Some(val) => {
if let serde_json::Value::String(s) = val {
s.clone()
} else {
serde_json::to_string(val).unwrap_or_default()
}
}
None => String::new(),
})
.collect();
wtr.write_record(&record)?;
}
wtr.into_inner()
.map_err(|e| SofError::CsvWriterError(e.to_string()))
}
fn format_json(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
let mut output = Vec::new();
for row in result.rows {
let mut row_obj = serde_json::Map::new();
for (i, column) in result.columns.iter().enumerate() {
let value = row
.values
.get(i)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or(serde_json::Value::Null);
row_obj.insert(column.clone(), value);
}
output.push(serde_json::Value::Object(row_obj));
}
Ok(serde_json::to_vec_pretty(&output)?)
}
fn format_ndjson(result: ProcessedResult) -> Result<Vec<u8>, SofError> {
let mut output = Vec::new();
for row in result.rows {
let mut row_obj = serde_json::Map::new();
for (i, column) in result.columns.iter().enumerate() {
let value = row
.values
.get(i)
.and_then(|v| v.as_ref())
.cloned()
.unwrap_or(serde_json::Value::Null);
row_obj.insert(column.clone(), value);
}
let line = serde_json::to_string(&serde_json::Value::Object(row_obj))?;
output.extend_from_slice(line.as_bytes());
output.push(b'\n');
}
Ok(output)
}
fn format_parquet(
result: ProcessedResult,
options: Option<&ParquetOptions>,
) -> Result<Vec<u8>, SofError> {
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use std::io::Cursor;
let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
let schema_ref = std::sync::Arc::new(schema.clone());
let parquet_opts = options.cloned().unwrap_or_default();
let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
const TARGET_ROWS_PER_BATCH: usize = 100_000; const MAX_ROWS_PER_BATCH: usize = 500_000;
let sample_size = std::cmp::min(100, result.rows.len());
let mut estimated_row_size = 100;
if sample_size > 0 {
let sample_json_size: usize = result.rows[..sample_size]
.iter()
.map(|row| {
row.values
.iter()
.filter_map(|v| v.as_ref())
.map(|v| v.to_string().len())
.sum::<usize>()
})
.sum();
estimated_row_size = (sample_json_size / sample_size).max(50);
}
let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
.clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
use parquet::basic::BrotliLevel;
use parquet::basic::GzipLevel;
use parquet::basic::ZstdLevel;
let compression = match parquet_opts.compression.as_str() {
"none" => Compression::UNCOMPRESSED,
"gzip" => Compression::GZIP(GzipLevel::default()),
"lz4" => Compression::LZ4,
"brotli" => Compression::BROTLI(BrotliLevel::default()),
"zstd" => Compression::ZSTD(ZstdLevel::default()),
_ => Compression::SNAPPY, };
let props = WriterProperties::builder()
.set_compression(compression)
.set_max_row_group_size(target_row_group_size_bytes)
.set_data_page_row_count_limit(20_000) .set_data_page_size_limit(target_page_size_bytes)
.set_write_batch_size(8192) .build();
let mut buffer = Vec::new();
let mut cursor = Cursor::new(&mut buffer);
let mut writer =
ArrowWriter::try_new(&mut cursor, schema_ref.clone(), Some(props)).map_err(|e| {
SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
})?;
let mut row_offset = 0;
while row_offset < result.rows.len() {
let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
let batch_rows = &result.rows[row_offset..batch_end];
let batch_arrays =
parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
SofError::ParquetConversionError(format!(
"Failed to create RecordBatch for rows {}-{}: {}",
row_offset, batch_end, e
))
})?;
writer.write(&batch).map_err(|e| {
SofError::ParquetConversionError(format!(
"Failed to write RecordBatch for rows {}-{}: {}",
row_offset, batch_end, e
))
})?;
row_offset = batch_end;
}
writer.close().map_err(|e| {
SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
})?;
Ok(buffer)
}
pub fn format_parquet_multi_file(
result: ProcessedResult,
options: Option<&ParquetOptions>,
max_file_size_bytes: usize,
) -> Result<Vec<Vec<u8>>, SofError> {
use arrow::record_batch::RecordBatch;
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use std::io::Cursor;
let schema = parquet_schema::create_arrow_schema(&result.columns, &result.rows)?;
let schema_ref = std::sync::Arc::new(schema.clone());
let parquet_opts = options.cloned().unwrap_or_default();
let target_row_group_size_bytes = (parquet_opts.row_group_size_mb as usize) * 1024 * 1024;
let target_page_size_bytes = (parquet_opts.page_size_kb as usize) * 1024;
const TARGET_ROWS_PER_BATCH: usize = 100_000;
const MAX_ROWS_PER_BATCH: usize = 500_000;
let sample_size = std::cmp::min(100, result.rows.len());
let mut estimated_row_size = 100;
if sample_size > 0 {
let sample_json_size: usize = result.rows[..sample_size]
.iter()
.map(|row| {
row.values
.iter()
.filter_map(|v| v.as_ref())
.map(|v| v.to_string().len())
.sum::<usize>()
})
.sum();
estimated_row_size = (sample_json_size / sample_size).max(50);
}
let optimal_batch_size = (target_row_group_size_bytes / estimated_row_size)
.clamp(TARGET_ROWS_PER_BATCH, MAX_ROWS_PER_BATCH);
use parquet::basic::BrotliLevel;
use parquet::basic::GzipLevel;
use parquet::basic::ZstdLevel;
let compression = match parquet_opts.compression.as_str() {
"none" => Compression::UNCOMPRESSED,
"gzip" => Compression::GZIP(GzipLevel::default()),
"lz4" => Compression::LZ4,
"brotli" => Compression::BROTLI(BrotliLevel::default()),
"zstd" => Compression::ZSTD(ZstdLevel::default()),
_ => Compression::SNAPPY,
};
let props = WriterProperties::builder()
.set_compression(compression)
.set_max_row_group_size(target_row_group_size_bytes)
.set_data_page_row_count_limit(20_000)
.set_data_page_size_limit(target_page_size_bytes)
.set_write_batch_size(8192)
.build();
let mut file_buffers = Vec::new();
let mut current_buffer = Vec::new();
let mut current_cursor = Cursor::new(&mut current_buffer);
let mut current_writer =
ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
.map_err(|e| {
SofError::ParquetConversionError(format!("Failed to create Parquet writer: {}", e))
})?;
let mut row_offset = 0;
let mut _current_file_rows = 0;
while row_offset < result.rows.len() {
let batch_end = (row_offset + optimal_batch_size).min(result.rows.len());
let batch_rows = &result.rows[row_offset..batch_end];
let batch_arrays =
parquet_schema::process_to_arrow_arrays(&schema, &result.columns, batch_rows)?;
let batch = RecordBatch::try_new(schema_ref.clone(), batch_arrays).map_err(|e| {
SofError::ParquetConversionError(format!(
"Failed to create RecordBatch for rows {}-{}: {}",
row_offset, batch_end, e
))
})?;
current_writer.write(&batch).map_err(|e| {
SofError::ParquetConversionError(format!(
"Failed to write RecordBatch for rows {}-{}: {}",
row_offset, batch_end, e
))
})?;
_current_file_rows += batch_end - row_offset;
row_offset = batch_end;
let current_size = current_writer.bytes_written();
if current_size >= max_file_size_bytes && row_offset < result.rows.len() {
current_writer.close().map_err(|e| {
SofError::ParquetConversionError(format!("Failed to close Parquet writer: {}", e))
})?;
file_buffers.push(current_buffer);
current_buffer = Vec::new();
current_cursor = Cursor::new(&mut current_buffer);
current_writer =
ArrowWriter::try_new(&mut current_cursor, schema_ref.clone(), Some(props.clone()))
.map_err(|e| {
SofError::ParquetConversionError(format!(
"Failed to create new Parquet writer: {}",
e
))
})?;
_current_file_rows = 0;
}
}
current_writer.close().map_err(|e| {
SofError::ParquetConversionError(format!("Failed to close final Parquet writer: {}", e))
})?;
file_buffers.push(current_buffer);
Ok(file_buffers)
}