use async_trait::async_trait;
use helios_fhir::FhirVersion;
use helios_sof::{
PreparedViewDefinition, ResourceChunk, filter_resources_by_patient_and_group,
filter_resources_by_since, parse_view_definition_for_version,
};
use serde_json::{Map, Value};
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
use crate::tenant::TenantContext;
const CHANNEL_BUFFER: usize = 256;
const CHUNK_SIZE: usize = 1024;
#[async_trait]
pub trait ResourceScan: Send + Sync {
async fn scan_resources(
&self,
tenant: &TenantContext,
resource_type: &str,
) -> Result<Vec<Value>, SofError>;
}
pub struct InProcessSofRunner {
scan: std::sync::Arc<dyn ResourceScan>,
fhir_version: FhirVersion,
runner_name: &'static str,
}
impl InProcessSofRunner {
pub fn new(
scan: std::sync::Arc<dyn ResourceScan>,
fhir_version: FhirVersion,
runner_name: &'static str,
) -> Self {
Self {
scan,
fhir_version,
runner_name,
}
}
}
fn map_engine_error(e: helios_sof::SofError) -> SofError {
use helios_sof::SofError as E;
match e {
E::InvalidViewDefinition(m) => SofError::InvalidViewDefinition(m),
E::ReferencedResourceNotFound(m) => SofError::InvalidViewDefinition(m),
other => SofError::Backend(other.to_string()),
}
}
fn row_to_view_row(columns: &[String], values: &[Option<Value>]) -> ViewRow {
let mut obj = Map::with_capacity(columns.len());
for (i, column) in columns.iter().enumerate() {
let value = values.get(i).and_then(|v| v.as_ref()).cloned();
obj.insert(column.clone(), value.unwrap_or(Value::Null));
}
Value::Object(obj)
}
#[async_trait]
impl SofRunner for InProcessSofRunner {
fn runner_name(&self) -> &'static str {
self.runner_name
}
async fn run_view(
&self,
tenant: &TenantContext,
view_definition: Value,
filters: ViewFilters,
) -> Result<RowStream, SofError> {
let view = parse_view_definition_for_version(view_definition, self.fhir_version)
.map_err(map_engine_error)?;
let prepared = PreparedViewDefinition::new(view).map_err(map_engine_error)?;
let resource_type = prepared.target_resource_type().to_string();
debug!(
runner = self.runner_name,
tenant = %tenant.tenant_id(),
resource_type = %resource_type,
"executing in-process ViewDefinition"
);
let mut resources = self.scan.scan_resources(tenant, &resource_type).await?;
if let Some(since) = filters.since {
resources = filter_resources_by_since(resources, since).map_err(map_engine_error)?;
}
if !filters.patient.is_empty() || !filters.group.is_empty() {
resources = filter_resources_by_patient_and_group(
resources,
&filters.patient,
&filters.group,
self.fhir_version,
)
.map_err(map_engine_error)?;
}
let limit = filters.limit;
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
tokio::task::spawn_blocking(move || {
let columns = prepared.columns().to_vec();
let total = resources.len();
let mut emitted = 0usize;
let mut offset = 0usize;
let mut chunk_index = 0usize;
while offset < total {
let end = (offset + CHUNK_SIZE).min(total);
let chunk = ResourceChunk {
resources: resources[offset..end].to_vec(),
chunk_index,
is_last: end >= total,
};
offset = end;
chunk_index += 1;
let result = match prepared.process_chunk(chunk) {
Ok(r) => r,
Err(e) => {
let _ = tx.blocking_send(Err(map_engine_error(e)));
return;
}
};
for row in &result.rows {
if let Some(cap) = limit
&& emitted >= cap
{
return;
}
emitted += 1;
let view_row = row_to_view_row(&columns, &row.values);
if tx.blocking_send(Ok(view_row)).is_err() {
return;
}
}
}
debug!(rows = emitted, "in-process view run complete");
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
}