use axum::{
Json,
extract::Query,
http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Response},
};
use chrono::{DateTime, Utc};
use helios_sof::{
ContentType, RunOptions, SofBundle, SofViewDefinition,
data_source::{DataSource, UniversalDataSource},
format_parquet_multi_file, get_fhir_version_string, get_newest_enabled_fhir_version,
process_view_definition, run_view_definition_with_options,
};
use tracing::{debug, info};
use super::{
error::{ServerError, ServerResult},
models::{
RunParameters, RunQueryParams, apply_result_filtering, extract_all_parameters,
parse_content_type, validate_query_params,
},
};
pub async fn capability_statement() -> ServerResult<impl IntoResponse> {
info!("Handling CapabilityStatement request");
let capability_statement = create_capability_statement();
Ok((
StatusCode::OK,
[(header::CONTENT_TYPE, "application/fhir+json")],
Json(capability_statement),
))
}
pub async fn run_view_definition_handler(
Query(params): Query<RunQueryParams>,
headers: HeaderMap,
Json(body): Json<serde_json::Value>,
) -> ServerResult<Response> {
info!("Handling ViewDefinition/$viewdefinition-run request");
debug!("Query params: {:?}", params);
let accept_header = headers.get(header::ACCEPT).and_then(|h| h.to_str().ok());
let validated_params =
validate_query_params(¶ms, accept_header).map_err(ServerError::BadRequest)?;
let parameters = parse_parameters(body)?;
let extracted_params = extract_all_parameters(parameters).map_err(ServerError::BadRequest)?;
if extracted_params.view_reference.is_some() {
return Err(ServerError::NotImplemented(
"The viewReference parameter is not yet implemented. Please provide the ViewDefinition directly using the viewResource parameter.".to_string()
));
}
if extracted_params.group.is_some() {
return Err(ServerError::NotImplemented(
"The group parameter is not yet implemented.".to_string(),
));
}
let view_def_json = extracted_params.view_definition;
let resources_json = if extracted_params.resources.is_empty() {
None
} else {
Some(extracted_params.resources)
};
let format_from_body = extracted_params.format;
let header_from_body = extracted_params.header;
let view_def_json = view_def_json
.ok_or_else(|| ServerError::BadRequest("No ViewDefinition provided".to_string()))?;
let mut validated_params = validated_params;
if let Some(format_str) = format_from_body {
let header_param = if let Some(h) = header_from_body {
Some(h)
} else {
match params.header.as_deref() {
Some("true") => Some(true),
Some("false") => Some(false),
_ => None,
}
};
let content_type = parse_content_type(
None, Some(&format_str),
header_param,
)?;
validated_params.format = content_type;
} else if let Some(header_bool) = header_from_body {
let format_str = match validated_params.format {
ContentType::Csv | ContentType::CsvWithHeader => "text/csv",
_ => {
return Err(ServerError::BadRequest(
"Header parameter only applies to CSV format".to_string(),
));
}
};
let content_type = parse_content_type(None, Some(format_str), Some(header_bool))?;
validated_params.format = content_type;
}
let mut filtered_resources = resources_json.unwrap_or_default();
let patient_filter = extracted_params
.patient
.or(validated_params.patient.clone());
let group_filter = extracted_params.group.or(validated_params.group.clone());
let source_param = extracted_params.source.or(validated_params.source.clone());
if let Some(limit) = extracted_params.limit {
validated_params.limit = Some(limit as usize);
}
if let Some(since_str) = extracted_params.since {
match DateTime::parse_from_rfc3339(&since_str) {
Ok(dt) => validated_params.since = Some(dt.with_timezone(&Utc)),
Err(_) => {
return Err(ServerError::BadRequest(format!(
"_since parameter must be a valid RFC3339 timestamp: {}",
since_str
)));
}
}
}
if extracted_params.max_file_size.is_some()
|| extracted_params.row_group_size.is_some()
|| extracted_params.page_size.is_some()
|| extracted_params.compression.is_some()
{
let mut parquet_opts = validated_params.parquet_options.clone().unwrap_or_else(|| {
helios_sof::ParquetOptions {
row_group_size_mb: 256,
page_size_kb: 1024,
compression: "snappy".to_string(),
max_file_size_mb: None,
}
});
if let Some(max_size) = extracted_params.max_file_size {
parquet_opts.max_file_size_mb = Some(max_size);
}
if let Some(row_group) = extracted_params.row_group_size {
parquet_opts.row_group_size_mb = row_group;
}
if let Some(page_size) = extracted_params.page_size {
parquet_opts.page_size_kb = page_size;
}
if let Some(compression) = extracted_params.compression {
parquet_opts.compression = compression;
}
validated_params.parquet_options = Some(parquet_opts);
}
let mut source_bundle = None;
let mut source_fhir_version = None;
if let Some(source) = &source_param {
info!("Loading data from source: {}", source);
let data_source = UniversalDataSource::new();
let loaded_bundle = data_source.load(source).await?;
source_fhir_version = Some(loaded_bundle.version());
let loaded_bundle = if patient_filter.is_some()
|| group_filter.is_some()
|| validated_params.since.is_some()
{
let mut source_resources = extract_resources_from_bundle(&loaded_bundle)?;
if patient_filter.is_some() || group_filter.is_some() {
source_resources = filter_resources_by_patient_and_group(
source_resources,
patient_filter.as_deref(),
group_filter.as_deref(),
)?;
}
if let Some(since) = validated_params.since {
source_resources = filter_resources_by_since(source_resources, since)?;
}
create_bundle_from_resources_for_version(
source_resources,
source_fhir_version.unwrap(),
)?
} else {
loaded_bundle
};
source_bundle = Some(loaded_bundle);
}
let view_definition = if let Some(version) = source_fhir_version {
info!(
"Parsing ViewDefinition as {:?} (matching source bundle)",
version
);
parse_view_definition_for_version(view_def_json, version)?
} else {
parse_view_definition(view_def_json)?
};
if patient_filter.is_some() || group_filter.is_some() {
filtered_resources = filter_resources_by_patient_and_group(
filtered_resources,
patient_filter.as_deref(),
group_filter.as_deref(),
)?;
}
if let Some(since) = validated_params.since {
filtered_resources = filter_resources_by_since(filtered_resources, since)?;
}
let bundle = if let Some(source_bundle) = source_bundle {
if filtered_resources.is_empty() {
source_bundle
} else {
merge_bundles(source_bundle, filtered_resources)?
}
} else {
create_bundle_from_resources(filtered_resources)?
};
let run_options = RunOptions {
since: validated_params.since,
limit: validated_params.limit,
page: None, parquet_options: validated_params.parquet_options.clone(),
};
info!(
"Executing ViewDefinition with output format: {:?}",
validated_params.format
);
if validated_params.format == ContentType::Parquet
&& validated_params
.parquet_options
.as_ref()
.and_then(|opts| opts.max_file_size_mb)
.is_some()
{
let processed_result = process_view_definition(view_definition, bundle)?;
let max_file_size_bytes = validated_params
.parquet_options
.as_ref()
.and_then(|opts| opts.max_file_size_mb)
.map(|mb| mb as usize * 1024 * 1024)
.unwrap_or(usize::MAX);
let file_buffers = format_parquet_multi_file(
processed_result,
validated_params.parquet_options.as_ref(),
max_file_size_bytes,
)?;
if file_buffers.len() > 1 {
info!(
"Generating ZIP archive with {} Parquet files",
file_buffers.len()
);
crate::streaming::stream_parquet_zip_response(file_buffers, "data")
} else {
let file_size = file_buffers[0].len();
if crate::streaming::should_use_streaming(file_size) {
info!("Streaming single Parquet file ({} bytes)", file_size);
crate::streaming::stream_single_parquet_response(file_buffers[0].clone())
} else {
Ok((
StatusCode::OK,
[(header::CONTENT_TYPE, "application/parquet")],
file_buffers[0].clone(),
)
.into_response())
}
}
} else {
let output = run_view_definition_with_options(
view_definition,
bundle,
validated_params.format,
run_options,
)?;
let filtered_output = apply_result_filtering(output, &validated_params)
.map_err(|e| ServerError::InternalError(format!("Failed to apply filtering: {}", e)))?;
let mime_type = match validated_params.format {
ContentType::Csv | ContentType::CsvWithHeader => "text/csv",
ContentType::Json => "application/json",
ContentType::NdJson => "application/x-ndjson",
ContentType::Parquet => "application/parquet",
};
Ok((
StatusCode::OK,
[(header::CONTENT_TYPE, mime_type)],
filtered_output,
)
.into_response())
}
}
fn create_capability_statement() -> serde_json::Value {
let fhir_version = get_fhir_version_string();
serde_json::json!({
"resourceType": "CapabilityStatement",
"id": "sof-server",
"name": "SQL-on-FHIR Server",
"title": "SQL-on-FHIR Server CapabilityStatement",
"status": "active",
"date": chrono::Utc::now().to_rfc3339(),
"publisher": "SQL-on-FHIR Implementation",
"kind": "instance",
"software": {
"name": "sof-server",
"version": env!("CARGO_PKG_VERSION")
},
"implementation": {
"description": "SQL-on-FHIR ViewDefinition Runner",
"url": "http://localhost:8080"
},
"fhirVersion": fhir_version,
"format": ["json"],
"rest": [{
"mode": "server",
"operation": [{
"name": "viewdefinition-run",
"definition": "http://sql-on-fhir.org/OperationDefinition/$viewdefinition-run",
"documentation": "Execute a ViewDefinition to transform FHIR resources into tabular format. Supports CSV, JSON, and NDJSON output formats. This is a type-level operation invoked at /ViewDefinition/$viewdefinition-run"
}]
}]
})
}
#[allow(dead_code)]
fn resolve_view_reference(reference: &str) -> ServerResult<SofViewDefinition> {
info!("Resolving ViewDefinition reference: {}", reference);
if !reference.starts_with("http://") && !reference.starts_with("https://") {
return Err(ServerError::NotImplemented(format!(
"Relative ViewDefinition references are not supported in this stateless implementation: {}",
reference
)));
}
if reference.contains('|') {
return Err(ServerError::NotImplemented(format!(
"Canonical URL references with versions are not yet supported: {}",
reference
)));
}
Err(ServerError::NotImplemented(format!(
"Loading ViewDefinitions from external URLs is not yet implemented: {}",
reference
)))
}
fn parse_view_definition(json: serde_json::Value) -> ServerResult<SofViewDefinition> {
parse_view_definition_for_version(json, get_newest_enabled_fhir_version())
}
fn parse_view_definition_for_version(
json: serde_json::Value,
version: helios_fhir::FhirVersion,
) -> ServerResult<SofViewDefinition> {
match version {
#[cfg(feature = "R4")]
helios_fhir::FhirVersion::R4 => {
let view_def: helios_fhir::r4::ViewDefinition =
serde_json::from_value(json).map_err(|e| {
ServerError::BadRequest(format!("Invalid R4 ViewDefinition: {}", e))
})?;
Ok(SofViewDefinition::R4(view_def))
}
#[cfg(feature = "R4B")]
helios_fhir::FhirVersion::R4B => {
let view_def: helios_fhir::r4b::ViewDefinition =
serde_json::from_value(json).map_err(|e| {
ServerError::BadRequest(format!("Invalid R4B ViewDefinition: {}", e))
})?;
Ok(SofViewDefinition::R4B(view_def))
}
#[cfg(feature = "R5")]
helios_fhir::FhirVersion::R5 => {
let view_def: helios_fhir::r5::ViewDefinition =
serde_json::from_value(json).map_err(|e| {
ServerError::BadRequest(format!("Invalid R5 ViewDefinition: {}", e))
})?;
Ok(SofViewDefinition::R5(view_def))
}
#[cfg(feature = "R6")]
helios_fhir::FhirVersion::R6 => {
let view_def: helios_fhir::r6::ViewDefinition =
serde_json::from_value(json).map_err(|e| {
ServerError::BadRequest(format!("Invalid R6 ViewDefinition: {}", e))
})?;
Ok(SofViewDefinition::R6(view_def))
}
}
}
fn parse_parameters(json: serde_json::Value) -> ServerResult<RunParameters> {
if let Some(resource_type) = json.get("resourceType") {
if resource_type != "Parameters" {
return Err(ServerError::BadRequest(
"Request body must be a Parameters resource".to_string(),
));
}
} else {
return Err(ServerError::BadRequest(
"Missing resourceType field".to_string(),
));
}
let newest_version = get_newest_enabled_fhir_version();
match newest_version {
#[cfg(feature = "R4")]
helios_fhir::FhirVersion::R4 => {
let params: helios_fhir::r4::Parameters = serde_json::from_value(json)
.map_err(|e| ServerError::BadRequest(format!("Invalid R4 Parameters: {}", e)))?;
Ok(RunParameters::R4(params))
}
#[cfg(feature = "R4B")]
helios_fhir::FhirVersion::R4B => {
let params: helios_fhir::r4b::Parameters = serde_json::from_value(json)
.map_err(|e| ServerError::BadRequest(format!("Invalid R4B Parameters: {}", e)))?;
Ok(RunParameters::R4B(params))
}
#[cfg(feature = "R5")]
helios_fhir::FhirVersion::R5 => {
let params: helios_fhir::r5::Parameters = serde_json::from_value(json)
.map_err(|e| ServerError::BadRequest(format!("Invalid R5 Parameters: {}", e)))?;
Ok(RunParameters::R5(params))
}
#[cfg(feature = "R6")]
helios_fhir::FhirVersion::R6 => {
let params: helios_fhir::r6::Parameters = serde_json::from_value(json)
.map_err(|e| ServerError::BadRequest(format!("Invalid R6 Parameters: {}", e)))?;
Ok(RunParameters::R6(params))
}
}
}
fn create_bundle_from_resources(resources: Vec<serde_json::Value>) -> ServerResult<SofBundle> {
create_bundle_from_resources_for_version(resources, get_newest_enabled_fhir_version())
}
fn create_bundle_from_resources_for_version(
resources: Vec<serde_json::Value>,
version: helios_fhir::FhirVersion,
) -> ServerResult<SofBundle> {
let bundle_json = serde_json::json!({
"resourceType": "Bundle",
"type": "collection",
"entry": resources.into_iter().map(|resource| {
serde_json::json!({
"resource": resource
})
}).collect::<Vec<_>>()
});
match version {
#[cfg(feature = "R4")]
helios_fhir::FhirVersion::R4 => {
let bundle: helios_fhir::r4::Bundle =
serde_json::from_value(bundle_json).map_err(|e| {
ServerError::InternalError(format!("Failed to create R4 Bundle: {}", e))
})?;
Ok(SofBundle::R4(bundle))
}
#[cfg(feature = "R4B")]
helios_fhir::FhirVersion::R4B => {
let bundle: helios_fhir::r4b::Bundle =
serde_json::from_value(bundle_json).map_err(|e| {
ServerError::InternalError(format!("Failed to create R4B Bundle: {}", e))
})?;
Ok(SofBundle::R4B(bundle))
}
#[cfg(feature = "R5")]
helios_fhir::FhirVersion::R5 => {
let bundle: helios_fhir::r5::Bundle =
serde_json::from_value(bundle_json).map_err(|e| {
ServerError::InternalError(format!("Failed to create R5 Bundle: {}", e))
})?;
Ok(SofBundle::R5(bundle))
}
#[cfg(feature = "R6")]
helios_fhir::FhirVersion::R6 => {
let bundle: helios_fhir::r6::Bundle =
serde_json::from_value(bundle_json).map_err(|e| {
ServerError::InternalError(format!("Failed to create R6 Bundle: {}", e))
})?;
Ok(SofBundle::R6(bundle))
}
}
}
fn extract_resources_from_bundle(bundle: &SofBundle) -> ServerResult<Vec<serde_json::Value>> {
let mut resources = Vec::new();
match bundle {
#[cfg(feature = "R4")]
SofBundle::R4(bundle) => {
if let Some(entries) = &bundle.entry {
for entry in entries {
if let Some(resource) = &entry.resource {
resources.push(serde_json::to_value(resource)?);
}
}
}
}
#[cfg(feature = "R4B")]
SofBundle::R4B(bundle) => {
if let Some(entries) = &bundle.entry {
for entry in entries {
if let Some(resource) = &entry.resource {
resources.push(serde_json::to_value(resource)?);
}
}
}
}
#[cfg(feature = "R5")]
SofBundle::R5(bundle) => {
if let Some(entries) = &bundle.entry {
for entry in entries {
if let Some(resource) = &entry.resource {
resources.push(serde_json::to_value(resource)?);
}
}
}
}
#[cfg(feature = "R6")]
SofBundle::R6(bundle) => {
if let Some(entries) = &bundle.entry {
for entry in entries {
if let Some(resource) = &entry.resource {
resources.push(serde_json::to_value(resource)?);
}
}
}
}
}
Ok(resources)
}
fn merge_bundles(
source_bundle: SofBundle,
additional_resources: Vec<serde_json::Value>,
) -> ServerResult<SofBundle> {
let mut all_resources = Vec::new();
match source_bundle {
#[cfg(feature = "R4")]
SofBundle::R4(bundle) => {
if let Some(entries) = bundle.entry {
for entry in entries {
if let Some(resource) = entry.resource {
all_resources.push(serde_json::to_value(&resource)?);
}
}
}
}
#[cfg(feature = "R4B")]
SofBundle::R4B(bundle) => {
if let Some(entries) = bundle.entry {
for entry in entries {
if let Some(resource) = entry.resource {
all_resources.push(serde_json::to_value(&resource)?);
}
}
}
}
#[cfg(feature = "R5")]
SofBundle::R5(bundle) => {
if let Some(entries) = bundle.entry {
for entry in entries {
if let Some(resource) = entry.resource {
all_resources.push(serde_json::to_value(&resource)?);
}
}
}
}
#[cfg(feature = "R6")]
SofBundle::R6(bundle) => {
if let Some(entries) = bundle.entry {
for entry in entries {
if let Some(resource) = entry.resource {
all_resources.push(serde_json::to_value(&resource)?);
}
}
}
}
}
all_resources.extend(additional_resources);
create_bundle_from_resources(all_resources)
}
fn filter_resources_by_patient_and_group(
resources: Vec<serde_json::Value>,
patient_ref: Option<&str>,
group_ref: Option<&str>,
) -> ServerResult<Vec<serde_json::Value>> {
let mut filtered = resources;
if let Some(patient_ref) = patient_ref {
let normalized_patient_ref = if patient_ref.starts_with("Patient/") {
patient_ref.to_string()
} else {
format!("Patient/{}", patient_ref)
};
debug!(
"Filtering resources by patient: {} (normalized: {})",
patient_ref, normalized_patient_ref
);
let patient_ref_to_match = normalized_patient_ref.as_str();
filtered.retain(|resource| {
if let Some(resource_type) = resource.get("resourceType").and_then(|r| r.as_str()) {
match resource_type {
"Patient" => {
if let Some(id) = resource.get("id").and_then(|i| i.as_str()) {
return format!("Patient/{}", id) == patient_ref_to_match;
}
}
"Observation" | "Condition" | "MedicationRequest" | "Procedure" => {
if let Some(subject) = resource.get("subject") {
if let Some(reference) =
subject.get("reference").and_then(|r| r.as_str())
{
return reference == patient_ref_to_match;
}
}
}
"Encounter" => {
if let Some(subject) = resource.get("subject") {
if let Some(reference) =
subject.get("reference").and_then(|r| r.as_str())
{
return reference == patient_ref_to_match;
}
}
}
_ => {
if let Some(patient) = resource.get("patient") {
if let Some(reference) =
patient.get("reference").and_then(|r| r.as_str())
{
return reference == patient_ref_to_match;
}
}
}
}
}
false
});
}
if let Some(_group_ref) = group_ref {
return Err(ServerError::NotImplemented(
"Group filtering is not yet implemented".to_string(),
));
}
Ok(filtered)
}
fn filter_resources_by_since(
resources: Vec<serde_json::Value>,
since: DateTime<Utc>,
) -> ServerResult<Vec<serde_json::Value>> {
debug!("Filtering resources modified since: {}", since);
let filtered: Vec<serde_json::Value> = resources
.into_iter()
.filter(|resource| {
if let Some(meta) = resource.get("meta") {
if let Some(last_updated) = meta.get("lastUpdated").and_then(|lu| lu.as_str()) {
match DateTime::parse_from_rfc3339(last_updated) {
Ok(resource_updated) => {
return resource_updated.with_timezone(&Utc) > since;
}
Err(e) => {
debug!(
"Failed to parse lastUpdated timestamp '{}': {}",
last_updated, e
);
}
}
}
}
false
})
.collect();
debug!("Filtered {} resources by _since parameter", filtered.len());
Ok(filtered)
}
pub async fn health_check() -> impl IntoResponse {
info!("Handling Health Check request");
Json(serde_json::json!({
"status": "ok",
"service": "sof-server",
"version": env!("CARGO_PKG_VERSION"),
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_capability_statement_structure() {
let cap_stmt = create_capability_statement();
assert_eq!(cap_stmt["resourceType"], "CapabilityStatement");
assert_eq!(cap_stmt["kind"], "instance");
assert_eq!(cap_stmt["fhirVersion"], get_fhir_version_string());
let operations = &cap_stmt["rest"][0]["operation"];
assert!(operations.as_array().is_some());
assert_eq!(operations[0]["name"], "viewdefinition-run");
}
#[test]
fn test_filter_resources_by_patient() {
let resources = vec![
serde_json::json!({
"resourceType": "Patient",
"id": "123"
}),
serde_json::json!({
"resourceType": "Patient",
"id": "456"
}),
serde_json::json!({
"resourceType": "Observation",
"id": "obs1",
"subject": {
"reference": "Patient/123"
}
}),
serde_json::json!({
"resourceType": "Observation",
"id": "obs2",
"subject": {
"reference": "Patient/456"
}
}),
];
let filtered =
filter_resources_by_patient_and_group(resources, Some("Patient/123"), None).unwrap();
assert_eq!(filtered.len(), 2);
assert_eq!(filtered[0]["id"], "123");
assert_eq!(filtered[1]["id"], "obs1");
}
#[test]
fn test_filter_resources_with_group_returns_error() {
let resources = vec![serde_json::json!({
"resourceType": "Patient",
"id": "123"
})];
let result = filter_resources_by_patient_and_group(resources, None, Some("Group/test"));
assert!(result.is_err());
if let Err(ServerError::NotImplemented(msg)) = result {
assert!(msg.contains("Group filtering is not yet implemented"));
} else {
panic!("Expected NotImplemented error");
}
}
#[test]
fn test_resolve_view_reference_relative() {
let result = resolve_view_reference("ViewDefinition/123");
assert!(result.is_err());
if let Err(ServerError::NotImplemented(msg)) = result {
assert!(msg.contains("Relative ViewDefinition references are not supported"));
} else {
panic!("Expected NotImplemented error");
}
}
#[test]
fn test_resolve_view_reference_canonical() {
let result = resolve_view_reference("http://example.org/ViewDefinition/test|1.0.0");
assert!(result.is_err());
if let Err(ServerError::NotImplemented(msg)) = result {
assert!(msg.contains("Canonical URL references with versions are not yet supported"));
} else {
panic!("Expected NotImplemented error");
}
}
#[test]
fn test_resolve_view_reference_absolute() {
let result = resolve_view_reference("http://example.org/ViewDefinition/123");
assert!(result.is_err());
if let Err(ServerError::NotImplemented(msg)) = result {
assert!(
msg.contains("Loading ViewDefinitions from external URLs is not yet implemented")
);
} else {
panic!("Expected NotImplemented error");
}
}
}