Skip to main content

helios_persistence/core/
sof_runner.rs

1//! SQL-on-FHIR runner abstraction.
2//!
3//! This module defines the [`SofRunner`] trait, implemented per-backend. Two
4//! strategies exist:
5//!
6//! - **In-DB runners** compile a [`ViewDefinition`] to a native query executed
7//!   inside the storage backend, skipping FHIRPath evaluation entirely: SQLite
8//!   and PostgreSQL compile to SQL, MongoDB compiles to an aggregation pipeline.
9//! - **In-process runners** stream the resources out of a backend that has no
10//!   query engine (S3 object storage, and S3-primary composites) and evaluate
11//!   the view with the `helios-sof` FHIRPath engine
12//!   ([`InProcessSofRunner`](crate::sof::in_process::InProcessSofRunner)).
13//!
14//! If the configured backend provides no runner at all, the
15//! `$viewdefinition-run` handler returns `501 Not Implemented`. Inline
16//! `resource:` parameters are materialised into a transient in-memory SQLite
17//! backend so they reuse the same in-DB pipeline.
18//!
19//! The handler layer streams the result rows directly into the HTTP response.
20
21use std::pin::Pin;
22
23use async_trait::async_trait;
24use futures::Stream;
25use serde_json::Value;
26
27use crate::tenant::TenantContext;
28
29/// Filters that narrow which resources are processed by a view run.
30///
31/// Per the SQL-on-FHIR v2 spec, `patient` and `group` are `0..*` — supplying
32/// multiple values must include resources matching ANY of them (union of the
33/// corresponding compartments).
34#[derive(Debug, Clone, Default)]
35pub struct ViewFilters {
36    /// Restrict to resources belonging to these patients (FHIR references,
37    /// e.g. `Patient/123`). Multiple values are unioned: a resource that
38    /// matches any reference is included.
39    pub patient: Vec<String>,
40
41    /// Restrict to resources belonging to these groups (FHIR references,
42    /// e.g. `Group/abc`). Multiple values are unioned.
43    pub group: Vec<String>,
44
45    /// Include only resources last-modified at or after this instant (RFC 3339).
46    pub since: Option<chrono::DateTime<chrono::Utc>>,
47
48    /// Maximum number of output rows to return (across all pages).
49    pub limit: Option<usize>,
50}
51
52/// A single output row from a view run.
53///
54/// Each row is a flat JSON object whose keys come from the ViewDefinition's `select`
55/// columns. Nested columns are dot-joined by convention (`name.family`).
56pub type ViewRow = Value;
57
58/// A pinned, heap-allocated, `Send + 'static` stream of view rows.
59///
60/// Streams returned by runners must own all their state (e.g. via cloned
61/// `Arc`s or owned `Vec`s) so that the caller can move them across tasks
62/// — for example, into an HTTP response body. The previous `'a` lifetime
63/// turned out to be unused by every implementation and prevented streaming
64/// responses, so it was removed.
65pub type RowStream = Pin<Box<dyn Stream<Item = Result<ViewRow, SofError>> + Send + 'static>>;
66
67/// Errors that can occur during SQL-on-FHIR view execution.
68#[derive(Debug, thiserror::Error)]
69pub enum SofError {
70    /// The ViewDefinition contains constructs that this runner cannot compile or execute.
71    ///
72    /// The `reason` field describes which construct is unsupported. The handler layer
73    /// maps this variant to a `422 Unprocessable Entity` OperationOutcome.
74    #[error("view definition is not compilable by this runner: {reason}")]
75    Uncompilable {
76        /// Human-readable description of the unsupported construct.
77        reason: String,
78    },
79
80    /// The ViewDefinition JSON is structurally invalid (missing required fields, wrong types).
81    #[error("invalid view definition: {0}")]
82    InvalidViewDefinition(String),
83
84    /// An error occurred while fetching resources from the storage backend.
85    #[error("storage error: {0}")]
86    Storage(String),
87
88    /// A backend-level SQL or driver error.
89    #[error("backend error: {0}")]
90    Backend(String),
91
92    /// The view run was cancelled (e.g. client disconnected, export job cancelled).
93    #[error("view run cancelled")]
94    Cancelled,
95}
96
97/// Abstraction over in-process and in-DB SQL-on-FHIR execution strategies.
98///
99/// # Object safety
100///
101/// `SofRunner` is object-safe and intended for use as `Arc<dyn SofRunner>`. The
102/// [`run_view`] method returns a heap-allocated [`RowStream`] to avoid associated
103/// types that would break object safety.
104///
105/// # Threading
106///
107/// Implementors must be `Send + Sync` so that the runner can be stored in `AppState`
108/// and shared across request tasks.
109#[async_trait]
110pub trait SofRunner: Send + Sync {
111    /// Execute a ViewDefinition and return a stream of output rows.
112    ///
113    /// # Arguments
114    ///
115    /// * `tenant` — The tenant context; all resource access is scoped to this tenant.
116    /// * `view_definition` — The raw ViewDefinition JSON (any FHIR version).
117    /// * `filters` — Optional filters (patient, group, since, limit).
118    ///
119    /// # Returns
120    ///
121    /// A [`RowStream`] that yields one flat JSON object per output row. The stream
122    /// may be infinite in theory; callers should honour the `filters.limit` cap or
123    /// impose their own.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`SofError::Uncompilable`] synchronously (before the stream is polled)
128    /// when this runner cannot handle the given ViewDefinition. The handler layer
129    /// must catch this and either fall back to the in-process runner or return `422`.
130    async fn run_view(
131        &self,
132        tenant: &TenantContext,
133        view_definition: Value,
134        filters: ViewFilters,
135    ) -> Result<RowStream, SofError>;
136
137    /// Returns a human-readable name for this runner (used in logs and diagnostics).
138    fn runner_name(&self) -> &'static str;
139}