Skip to main content

helios_persistence/sof/
in_process.rs

1//! Generic in-process SQL-on-FHIR runner.
2//!
3//! Unlike the SQLite and PostgreSQL runners — which compile a ViewDefinition to
4//! native SQL and execute it inside the database — this runner streams the
5//! resources of the view's target type out of a backend and evaluates the
6//! ViewDefinition with the in-process `helios-sof` FHIRPath engine. It exists
7//! for backends that have no query engine to push the computation into (S3
8//! object storage, and S3-primary composites such as `s3-elasticsearch`).
9//!
10//! The heavy lifting is delegated to [`helios_sof::PreparedViewDefinition`],
11//! the same engine that serves inline `resource:` runs and the `sof-cli`/`sof-server`
12//! tools, so this runner introduces no new SQL-on-FHIR semantics. Resource
13//! access is abstracted behind [`ResourceScan`] so the runner is
14//! backend-agnostic.
15
16use async_trait::async_trait;
17use helios_fhir::FhirVersion;
18use helios_sof::{
19    PreparedViewDefinition, ResourceChunk, filter_resources_by_patient_and_group,
20    filter_resources_by_since, parse_view_definition_for_version,
21};
22use serde_json::{Map, Value};
23use tokio_stream::wrappers::ReceiverStream;
24use tracing::debug;
25
26use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
27use crate::tenant::TenantContext;
28
29/// Channel buffer depth (rows that can be queued ahead of the consumer).
30const CHANNEL_BUFFER: usize = 256;
31
32/// Number of resources handed to the engine per [`PreparedViewDefinition::process_chunk`]
33/// call. Bounds peak memory when scanning large resource types.
34const CHUNK_SIZE: usize = 1024;
35
36/// Streams the live resources of a single resource type for a tenant.
37///
38/// Implemented by backends that lack an in-DB SOF runner so they can reuse
39/// [`InProcessSofRunner`]. Implementations must return the raw FHIR resource
40/// JSON (the `content()` of each stored resource), excluding soft-deleted ones.
41#[async_trait]
42pub trait ResourceScan: Send + Sync {
43    /// Returns every live resource of `resource_type` visible to `tenant` as
44    /// raw FHIR JSON.
45    async fn scan_resources(
46        &self,
47        tenant: &TenantContext,
48        resource_type: &str,
49    ) -> Result<Vec<Value>, SofError>;
50}
51
52/// In-process SQL-on-FHIR runner backed by an arbitrary [`ResourceScan`].
53pub struct InProcessSofRunner {
54    scan: std::sync::Arc<dyn ResourceScan>,
55    fhir_version: FhirVersion,
56    runner_name: &'static str,
57}
58
59impl InProcessSofRunner {
60    /// Creates a runner that scans resources via `scan` and evaluates views
61    /// against `fhir_version`. `runner_name` is surfaced in logs/diagnostics
62    /// (e.g. `"s3-in-process"`).
63    pub fn new(
64        scan: std::sync::Arc<dyn ResourceScan>,
65        fhir_version: FhirVersion,
66        runner_name: &'static str,
67    ) -> Self {
68        Self {
69            scan,
70            fhir_version,
71            runner_name,
72        }
73    }
74}
75
76/// Maps a `helios_sof` engine error onto the persistence-layer [`SofError`].
77///
78/// Structural/validation problems and the spec's absent-target case become
79/// `InvalidViewDefinition` (surfaced as a 4xx); everything else is a runtime
80/// backend error.
81fn map_engine_error(e: helios_sof::SofError) -> SofError {
82    use helios_sof::SofError as E;
83    match e {
84        E::InvalidViewDefinition(m) => SofError::InvalidViewDefinition(m),
85        E::ReferencedResourceNotFound(m) => SofError::InvalidViewDefinition(m),
86        other => SofError::Backend(other.to_string()),
87    }
88}
89
90/// Zips one engine row (`columns` × `values`) into a flat JSON object, the
91/// same shape the SQL runners emit. Missing/`None` values become JSON `null`.
92fn row_to_view_row(columns: &[String], values: &[Option<Value>]) -> ViewRow {
93    let mut obj = Map::with_capacity(columns.len());
94    for (i, column) in columns.iter().enumerate() {
95        let value = values.get(i).and_then(|v| v.as_ref()).cloned();
96        obj.insert(column.clone(), value.unwrap_or(Value::Null));
97    }
98    Value::Object(obj)
99}
100
101#[async_trait]
102impl SofRunner for InProcessSofRunner {
103    fn runner_name(&self) -> &'static str {
104        self.runner_name
105    }
106
107    async fn run_view(
108        &self,
109        tenant: &TenantContext,
110        view_definition: Value,
111        filters: ViewFilters,
112    ) -> Result<RowStream, SofError> {
113        let view = parse_view_definition_for_version(view_definition, self.fhir_version)
114            .map_err(map_engine_error)?;
115        let prepared = PreparedViewDefinition::new(view).map_err(map_engine_error)?;
116        let resource_type = prepared.target_resource_type().to_string();
117
118        debug!(
119            runner = self.runner_name,
120            tenant = %tenant.tenant_id(),
121            resource_type = %resource_type,
122            "executing in-process ViewDefinition"
123        );
124
125        // Scan the view's target type, then apply the SoF run filters that the
126        // engine itself does not handle (patient/group compartment and `since`).
127        let mut resources = self.scan.scan_resources(tenant, &resource_type).await?;
128        if let Some(since) = filters.since {
129            resources = filter_resources_by_since(resources, since).map_err(map_engine_error)?;
130        }
131        if !filters.patient.is_empty() || !filters.group.is_empty() {
132            resources = filter_resources_by_patient_and_group(
133                resources,
134                &filters.patient,
135                &filters.group,
136                self.fhir_version,
137            )
138            .map_err(map_engine_error)?;
139        }
140
141        let limit = filters.limit;
142        let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
143
144        // FHIRPath evaluation is CPU-bound (and `process_chunk` parallelises via
145        // rayon), so run it off the async runtime.
146        tokio::task::spawn_blocking(move || {
147            let columns = prepared.columns().to_vec();
148            let total = resources.len();
149            let mut emitted = 0usize;
150            let mut offset = 0usize;
151            let mut chunk_index = 0usize;
152
153            while offset < total {
154                let end = (offset + CHUNK_SIZE).min(total);
155                let chunk = ResourceChunk {
156                    resources: resources[offset..end].to_vec(),
157                    chunk_index,
158                    is_last: end >= total,
159                };
160                offset = end;
161                chunk_index += 1;
162
163                let result = match prepared.process_chunk(chunk) {
164                    Ok(r) => r,
165                    Err(e) => {
166                        let _ = tx.blocking_send(Err(map_engine_error(e)));
167                        return;
168                    }
169                };
170
171                for row in &result.rows {
172                    if let Some(cap) = limit
173                        && emitted >= cap
174                    {
175                        return;
176                    }
177                    emitted += 1;
178                    let view_row = row_to_view_row(&columns, &row.values);
179                    if tx.blocking_send(Ok(view_row)).is_err() {
180                        // Receiver dropped (client disconnected) — stop.
181                        return;
182                    }
183                }
184            }
185
186            debug!(rows = emitted, "in-process view run complete");
187        });
188
189        Ok(Box::pin(ReceiverStream::new(rx)))
190    }
191}