helios_persistence/sof/
in_process.rs1use async_trait::async_trait;
17use helios_fhir::FhirVersion;
18use helios_sof::{
19 PreparedViewDefinition, ResourceChunk, filter_resources_by_patient_and_group,
20 filter_resources_by_since, parse_view_definition_for_version,
21};
22use serde_json::{Map, Value};
23use tokio_stream::wrappers::ReceiverStream;
24use tracing::debug;
25
26use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
27use crate::tenant::TenantContext;
28
29const CHANNEL_BUFFER: usize = 256;
31
32const CHUNK_SIZE: usize = 1024;
35
36#[async_trait]
42pub trait ResourceScan: Send + Sync {
43 async fn scan_resources(
46 &self,
47 tenant: &TenantContext,
48 resource_type: &str,
49 ) -> Result<Vec<Value>, SofError>;
50}
51
52pub struct InProcessSofRunner {
54 scan: std::sync::Arc<dyn ResourceScan>,
55 fhir_version: FhirVersion,
56 runner_name: &'static str,
57}
58
59impl InProcessSofRunner {
60 pub fn new(
64 scan: std::sync::Arc<dyn ResourceScan>,
65 fhir_version: FhirVersion,
66 runner_name: &'static str,
67 ) -> Self {
68 Self {
69 scan,
70 fhir_version,
71 runner_name,
72 }
73 }
74}
75
76fn map_engine_error(e: helios_sof::SofError) -> SofError {
82 use helios_sof::SofError as E;
83 match e {
84 E::InvalidViewDefinition(m) => SofError::InvalidViewDefinition(m),
85 E::ReferencedResourceNotFound(m) => SofError::InvalidViewDefinition(m),
86 other => SofError::Backend(other.to_string()),
87 }
88}
89
90fn row_to_view_row(columns: &[String], values: &[Option<Value>]) -> ViewRow {
93 let mut obj = Map::with_capacity(columns.len());
94 for (i, column) in columns.iter().enumerate() {
95 let value = values.get(i).and_then(|v| v.as_ref()).cloned();
96 obj.insert(column.clone(), value.unwrap_or(Value::Null));
97 }
98 Value::Object(obj)
99}
100
101#[async_trait]
102impl SofRunner for InProcessSofRunner {
103 fn runner_name(&self) -> &'static str {
104 self.runner_name
105 }
106
107 async fn run_view(
108 &self,
109 tenant: &TenantContext,
110 view_definition: Value,
111 filters: ViewFilters,
112 ) -> Result<RowStream, SofError> {
113 let view = parse_view_definition_for_version(view_definition, self.fhir_version)
114 .map_err(map_engine_error)?;
115 let prepared = PreparedViewDefinition::new(view).map_err(map_engine_error)?;
116 let resource_type = prepared.target_resource_type().to_string();
117
118 debug!(
119 runner = self.runner_name,
120 tenant = %tenant.tenant_id(),
121 resource_type = %resource_type,
122 "executing in-process ViewDefinition"
123 );
124
125 let mut resources = self.scan.scan_resources(tenant, &resource_type).await?;
128 if let Some(since) = filters.since {
129 resources = filter_resources_by_since(resources, since).map_err(map_engine_error)?;
130 }
131 if !filters.patient.is_empty() || !filters.group.is_empty() {
132 resources = filter_resources_by_patient_and_group(
133 resources,
134 &filters.patient,
135 &filters.group,
136 self.fhir_version,
137 )
138 .map_err(map_engine_error)?;
139 }
140
141 let limit = filters.limit;
142 let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
143
144 tokio::task::spawn_blocking(move || {
147 let columns = prepared.columns().to_vec();
148 let total = resources.len();
149 let mut emitted = 0usize;
150 let mut offset = 0usize;
151 let mut chunk_index = 0usize;
152
153 while offset < total {
154 let end = (offset + CHUNK_SIZE).min(total);
155 let chunk = ResourceChunk {
156 resources: resources[offset..end].to_vec(),
157 chunk_index,
158 is_last: end >= total,
159 };
160 offset = end;
161 chunk_index += 1;
162
163 let result = match prepared.process_chunk(chunk) {
164 Ok(r) => r,
165 Err(e) => {
166 let _ = tx.blocking_send(Err(map_engine_error(e)));
167 return;
168 }
169 };
170
171 for row in &result.rows {
172 if let Some(cap) = limit
173 && emitted >= cap
174 {
175 return;
176 }
177 emitted += 1;
178 let view_row = row_to_view_row(&columns, &row.values);
179 if tx.blocking_send(Ok(view_row)).is_err() {
180 return;
182 }
183 }
184 }
185
186 debug!(rows = emitted, "in-process view run complete");
187 });
188
189 Ok(Box::pin(ReceiverStream::new(rx)))
190 }
191}