Skip to main content

helios_persistence/sof/
sqlite.rs

1//! SQLite in-DB SQL-on-FHIR runner.
2//!
3//! [`SqliteInDbRunner`] compiles a ViewDefinition to a parameterised SQLite
4//! `SELECT` statement and executes it directly against the `resources` table,
5//! bypassing in-process FHIRPath evaluation entirely.
6//!
7//! ## Streaming
8//!
9//! Rows are sent one-by-one through a bounded `tokio::sync::mpsc` channel
10//! (buffer: 256) so the HTTP layer can begin flushing to the client before the
11//! full result set is read.  The blocking SQLite iteration runs in a dedicated
12//! `spawn_blocking` thread so it never stalls the async runtime.
13
14use helios_fhir::FhirVersion;
15use r2d2::Pool;
16use r2d2_sqlite::SqliteConnectionManager;
17use rusqlite::types::ValueRef;
18use serde_json::{Map, Value};
19use tokio_stream::wrappers::ReceiverStream;
20use tracing::debug;
21
22use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
23use crate::tenant::TenantContext;
24
25use super::compiler::{SqlDialect, compile_view_definition_dialect};
26
27/// Channel buffer depth (rows that can be queued ahead of the consumer).
28const CHANNEL_BUFFER: usize = 256;
29
30/// SQL-on-FHIR runner that compiles ViewDefinitions to SQLite SQL.
31pub struct SqliteInDbRunner {
32    pool: Pool<SqliteConnectionManager>,
33    fhir_version: FhirVersion,
34}
35
36impl SqliteInDbRunner {
37    /// Creates a new runner backed by the given connection pool. Uses the
38    /// default FHIR version (R4) for compile-time cardinality lookups; call
39    /// [`Self::with_fhir_version`] to override.
40    pub fn new(pool: Pool<SqliteConnectionManager>) -> Self {
41        Self {
42            pool,
43            fhir_version: FhirVersion::default_enabled(),
44        }
45    }
46
47    /// Returns a runner that consults the given FHIR version's field-type
48    /// table when validating `collection: false` columns.
49    pub fn with_fhir_version(mut self, version: FhirVersion) -> Self {
50        self.fhir_version = version;
51        self
52    }
53}
54
55#[async_trait::async_trait]
56impl SofRunner for SqliteInDbRunner {
57    fn runner_name(&self) -> &'static str {
58        "sqlite-indb"
59    }
60
61    async fn run_view(
62        &self,
63        tenant: &TenantContext,
64        view_definition: Value,
65        mut filters: ViewFilters,
66    ) -> Result<RowStream, SofError> {
67        // Compile synchronously (cheap, no I/O)
68        let compiled = compile_view_definition_dialect(
69            &view_definition,
70            SqlDialect::Sqlite,
71            self.fhir_version,
72        )?;
73
74        debug!(
75            runner = "sqlite-indb",
76            tenant = %tenant.tenant_id(),
77            "executing compiled ViewDefinition"
78        );
79
80        let tenant_id = tenant.tenant_id().to_string();
81        let resource_type = view_definition
82            .get("resource")
83            .and_then(|v| v.as_str())
84            .unwrap_or("")
85            .to_string();
86
87        // Spec-correct `group` handling: resolve each Group/{id} to its
88        // `member.entity` Patient references and fold them into the patient
89        // filter, mirroring the inline path's behavior. Group resolution
90        // is an extra DB read per group ref; once done we clear the
91        // group_refs so build_sqlite_sql doesn't double-apply.
92        if !filters.group.is_empty() {
93            let resolved =
94                resolve_group_refs_to_patient_refs(&self.pool, &tenant_id, &filters.group)?;
95            for p in resolved {
96                if !filters.patient.iter().any(|existing| existing == &p) {
97                    filters.patient.push(p);
98                }
99            }
100            filters.group.clear();
101        }
102
103        let limit = filters.limit;
104        let columns = compiled.columns.clone();
105        let pool = self.pool.clone();
106
107        // Inject runtime filter conditions (since, patient/group). The
108        // compiled query already reserves `?3..?N` for ViewDefinition
109        // constants; runtime filters allocate from the next free slot.
110        let (sql, extra_params) = build_sqlite_sql(
111            &compiled.sql,
112            &compiled.constants,
113            &filters,
114            self.fhir_version,
115            &resource_type,
116        );
117
118        let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
119
120        tokio::task::spawn_blocking(move || {
121            stream_sqlite_rows(
122                &pool,
123                &sql,
124                &tenant_id,
125                &resource_type,
126                extra_params,
127                &columns,
128                limit,
129                tx,
130            );
131        });
132
133        Ok(Box::pin(ReceiverStream::new(rx)))
134    }
135}
136
137/// Loads each `Group/{id}` from the `resources` table and extracts its
138/// `member.entity` Patient references via the shared
139/// [`helios_sof::resolve_group_members_to_patient_refs`]. Returns the
140/// union of those Patient refs across all supplied group refs. Unknown
141/// groups are silently skipped (matches the inline path; absent-target
142/// warning is audit item #5).
143fn resolve_group_refs_to_patient_refs(
144    pool: &Pool<SqliteConnectionManager>,
145    tenant_id: &str,
146    group_refs: &[String],
147) -> Result<Vec<String>, SofError> {
148    if group_refs.is_empty() {
149        return Ok(Vec::new());
150    }
151    let conn = pool
152        .get()
153        .map_err(|e| SofError::Storage(format!("failed to get sqlite connection: {e}")))?;
154    let mut stmt = conn
155        .prepare(
156            "SELECT data FROM resources \
157             WHERE tenant_id = ?1 \
158               AND resource_type = 'Group' \
159               AND id = ?2 \
160               AND is_deleted = 0",
161        )
162        .map_err(|e| SofError::Storage(format!("prepare failed: {e}")))?;
163
164    let mut groups = Vec::with_capacity(group_refs.len());
165    for r in group_refs {
166        let id = r.strip_prefix("Group/").unwrap_or(r);
167        let res: rusqlite::Result<Vec<u8>> = stmt.query_row([tenant_id, id], |row| row.get(0));
168        match res {
169            Ok(bytes) => match serde_json::from_slice::<Value>(&bytes) {
170                Ok(v) => groups.push(v),
171                Err(_) => continue,
172            },
173            Err(rusqlite::Error::QueryReturnedNoRows) => continue,
174            Err(e) => {
175                return Err(SofError::Storage(format!(
176                    "group lookup failed for {r}: {e}"
177                )));
178            }
179        }
180    }
181
182    let set = helios_sof::resolve_group_members_to_patient_refs(group_refs, &groups);
183    Ok(set.into_iter().collect())
184}
185
186// ============================================================================
187// SQL runtime-filter injection
188// ============================================================================
189
190/// Appends runtime filter conditions (`since`, `patient`) to the compiled SQL
191/// and returns the bound parameters that follow `tenant_id` and
192/// `resource_type` (i.e. ViewDefinition constants then runtime filter values).
193///
194/// SQLite positional parameters are `?1`, `?2`, … The base SQL always uses
195/// `?1 = tenant_id` and `?2 = resource_type`. Constants then occupy
196/// `?3..?(2+constants.len())`; runtime filter conditions bind from the next
197/// free slot.
198fn build_sqlite_sql(
199    base_sql: &str,
200    constants: &[super::ir::LitValue],
201    filters: &ViewFilters,
202    fhir_version: FhirVersion,
203    resource_type: &str,
204) -> (String, Vec<SqliteParam>) {
205    let mut conditions: Vec<String> = Vec::new();
206    let mut extra_params: Vec<SqliteParam> = constants
207        .iter()
208        .map(SqliteParam::from_lit)
209        .collect::<Vec<_>>();
210    let mut next_param = 3usize + constants.len();
211
212    if let Some(since) = &filters.since {
213        conditions.push(format!("r.last_updated >= ?{next_param}"));
214        // Store as RFC 3339 string — SQLite datetime columns are TEXT
215        extra_params.push(SqliteParam::Text(since.to_rfc3339()));
216        next_param += 1;
217    }
218
219    if let Some(c) = compartment_filter_sql(
220        fhir_version,
221        "Patient",
222        resource_type,
223        &filters.patient,
224        &mut next_param,
225        &mut extra_params,
226    ) {
227        conditions.push(c);
228    }
229
230    if let Some(c) = compartment_filter_sql(
231        fhir_version,
232        "Group",
233        resource_type,
234        &filters.group,
235        &mut next_param,
236        &mut extra_params,
237    ) {
238        conditions.push(c);
239    }
240
241    if conditions.is_empty() {
242        return (base_sql.to_string(), extra_params);
243    }
244
245    let joined = conditions.join(" AND ");
246    let sql = inject_before_order_by(base_sql, &format!(" AND {joined}"));
247    (sql, extra_params)
248}
249
250/// Builds a SQLite `WHERE` fragment that filters `r` to resources in the
251/// named compartment of any of `compartment_refs`. Drives the lookup off
252/// the spec's `CompartmentDefinition` via [`helios_fhir::compartment_params`]
253/// and queries the pre-populated `search_index` table — no FHIRPath
254/// evaluation at query time. Returns `None` when there are no compartment
255/// refs to filter by (skip the clause entirely).
256///
257/// Two cases:
258///
259/// 1. **Resource = compartment owner** (e.g. `compartment_type="Patient"`
260///    and `resource_type="Patient"`): match `r.id` against the id portion
261///    of each compartment ref.
262/// 2. **Other resource types**: look up
263///    [`helios_fhir::compartment_params`] to get the linking search-param
264///    names, then emit an `EXISTS (SELECT 1 FROM search_index …)` clause
265///    that joins on `(tenant_id, resource_type, resource_id)` and matches
266///    any of those param names against any of the compartment refs. If
267///    the resource type isn't in the compartment at all, emit `1=0` so
268///    the result set is empty (spec-correct).
269fn compartment_filter_sql(
270    fhir_version: FhirVersion,
271    compartment_type: &str,
272    resource_type: &str,
273    compartment_refs: &[String],
274    next_param: &mut usize,
275    extra_params: &mut Vec<SqliteParam>,
276) -> Option<String> {
277    if compartment_refs.is_empty() {
278        return None;
279    }
280
281    let canonical_prefix = format!("{}/", compartment_type);
282
283    // Case 1: the view's resource is the compartment owner itself.
284    if resource_type == compartment_type {
285        let mut ors: Vec<String> = Vec::with_capacity(compartment_refs.len());
286        for r in compartment_refs {
287            let id = r.strip_prefix(canonical_prefix.as_str()).unwrap_or(r);
288            let p = *next_param;
289            ors.push(format!("r.id = ?{p}"));
290            extra_params.push(SqliteParam::Text(id.to_string()));
291            *next_param += 1;
292        }
293        return Some(format!("({})", ors.join(" OR ")));
294    }
295
296    // Case 2: look up the search-param names that link `resource_type`
297    // to the compartment.
298    let names = helios_fhir::compartment_params(fhir_version, compartment_type, resource_type);
299    if names.is_empty() {
300        // Spec: "Server SHALL NOT return resources from patient compartments
301        // outside provided list." This resource type isn't a member of the
302        // compartment, so no rows can match.
303        return Some("1=0".to_string());
304    }
305
306    let mut name_placeholders = Vec::with_capacity(names.len());
307    for n in names {
308        let p = *next_param;
309        name_placeholders.push(format!("?{p}"));
310        extra_params.push(SqliteParam::Text((*n).to_string()));
311        *next_param += 1;
312    }
313
314    let mut ref_placeholders = Vec::with_capacity(compartment_refs.len());
315    for r in compartment_refs {
316        let canonical = if r.starts_with(canonical_prefix.as_str()) {
317            r.clone()
318        } else {
319            format!("{}{}", canonical_prefix, r)
320        };
321        let p = *next_param;
322        ref_placeholders.push(format!("?{p}"));
323        extra_params.push(SqliteParam::Text(canonical));
324        *next_param += 1;
325    }
326
327    // `?1` and `?2` are tenant_id and resource_type (bound by the outer
328    // query); we reuse them inside the EXISTS subquery so the search_index
329    // join stays tenant-isolated and resource-typed.
330    Some(format!(
331        "EXISTS (SELECT 1 FROM search_index si \
332         WHERE si.tenant_id = ?1 \
333           AND si.resource_type = ?2 \
334           AND si.resource_id = r.id \
335           AND si.param_name IN ({}) \
336           AND si.value_reference IN ({}))",
337        name_placeholders.join(","),
338        ref_placeholders.join(",")
339    ))
340}
341
342/// Inserts `extra` before the trailing `ORDER BY` in `sql`, or appends it.
343///
344/// The compiler emits `\nORDER BY …` (newline-prefixed), so we search for
345/// that pattern first; the space-prefixed variant is checked as a fallback for
346/// any hand-crafted SQL.
347fn inject_before_order_by(sql: &str, extra: &str) -> String {
348    // Try newline-prefixed ORDER BY first (what the compiler generates).
349    let search = ["\nORDER BY", " ORDER BY"];
350    for pat in search {
351        if let Some(pos) = sql.rfind(pat) {
352            let mut s = sql.to_string();
353            s.insert_str(pos, extra);
354            return s;
355        }
356    }
357    format!("{sql}{extra}")
358}
359
360// ============================================================================
361// Typed parameter — same role as `PgParam` on the PostgreSQL runner.
362// ============================================================================
363
364/// Bound-parameter value for the SQLite runner. Mirrors [`super::ir::LitValue`]
365/// plus a Text variant for runtime filter strings.
366#[derive(Clone, Debug)]
367enum SqliteParam {
368    Text(String),
369    Bool(bool),
370    Int(i64),
371    /// Decimal preserved as text — SQLite is dynamic-typed and accepts text
372    /// for numeric comparisons.
373    Decimal(String),
374    Null,
375}
376
377impl SqliteParam {
378    fn from_lit(v: &super::ir::LitValue) -> Self {
379        match v {
380            super::ir::LitValue::Null => SqliteParam::Null,
381            super::ir::LitValue::Bool(b) => SqliteParam::Bool(*b),
382            super::ir::LitValue::Int(n) => SqliteParam::Int(*n),
383            super::ir::LitValue::Decimal(s) => SqliteParam::Decimal(s.clone()),
384            super::ir::LitValue::Str(s) => SqliteParam::Text(s.clone()),
385        }
386    }
387}
388
389impl rusqlite::ToSql for SqliteParam {
390    fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
391        use rusqlite::types::{ToSqlOutput, Value};
392        Ok(match self {
393            SqliteParam::Text(s) => ToSqlOutput::Borrowed(s.as_str().into()),
394            SqliteParam::Bool(b) => ToSqlOutput::Owned(Value::Integer(if *b { 1 } else { 0 })),
395            SqliteParam::Int(n) => ToSqlOutput::Owned(Value::Integer(*n)),
396            // Bind as REAL so SQLite's type-affinity rules let the value
397            // compare numerically against `json_extract` results (which are
398            // INTEGER/REAL for JSON numbers). Binding as TEXT puts the
399            // value in a different storage class and SQLite ranks any TEXT
400            // as greater than any numeric value, breaking `<` / `>`.
401            SqliteParam::Decimal(s) => match s.parse::<f64>() {
402                Ok(n) => ToSqlOutput::Owned(Value::Real(n)),
403                Err(_) => ToSqlOutput::Owned(Value::Text(s.clone())),
404            },
405            SqliteParam::Null => ToSqlOutput::Owned(Value::Null),
406        })
407    }
408}
409
410// ============================================================================
411// Blocking row iterator → channel
412// ============================================================================
413
414#[allow(clippy::too_many_arguments)]
415fn stream_sqlite_rows(
416    pool: &Pool<SqliteConnectionManager>,
417    sql: &str,
418    tenant_id: &str,
419    resource_type: &str,
420    extra_params: Vec<SqliteParam>,
421    columns: &[String],
422    limit: Option<usize>,
423    tx: tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
424) {
425    let conn = match pool.get() {
426        Ok(c) => c,
427        Err(e) => {
428            let _ = tx.blocking_send(Err(SofError::Storage(format!(
429                "failed to acquire SQLite connection: {e}"
430            ))));
431            return;
432        }
433    };
434
435    let mut stmt = match conn.prepare(sql) {
436        Ok(s) => s,
437        Err(e) => {
438            let _ = tx.blocking_send(Err(SofError::Backend(format!(
439                "failed to prepare SQL: {e}"
440            ))));
441            return;
442        }
443    };
444
445    // Build the bound-parameter list: tenant_id, resource_type, then the
446    // typed constants + runtime filters from `extra_params`.
447    let mut all_params: Vec<SqliteParam> = Vec::with_capacity(2 + extra_params.len());
448    all_params.push(SqliteParam::Text(tenant_id.to_string()));
449    all_params.push(SqliteParam::Text(resource_type.to_string()));
450    all_params.extend(extra_params);
451
452    let row_iter = {
453        match stmt.query_map(rusqlite::params_from_iter(all_params.iter()), |row| {
454            map_sqlite_row(row, columns)
455        }) {
456            Ok(iter) => iter,
457            Err(e) => {
458                let _ = tx.blocking_send(Err(SofError::Backend(format!(
459                    "query execution failed: {e}"
460                ))));
461                return;
462            }
463        }
464    };
465
466    let mut count = 0usize;
467    for row_result in row_iter {
468        if let Some(cap) = limit {
469            if count >= cap {
470                break;
471            }
472        }
473        count += 1;
474
475        let row = match row_result {
476            Ok(map) => Ok(Value::Object(map)),
477            Err(e) => Err(SofError::Backend(format!("row error: {e}"))),
478        };
479
480        if tx.blocking_send(row).is_err() {
481            // Receiver dropped (client disconnected) — stop iterating
482            break;
483        }
484    }
485
486    debug!(
487        runner = "sqlite-indb",
488        rows = count,
489        "in-DB view run complete"
490    );
491    // tx is dropped here, closing the ReceiverStream on the consumer side
492}
493
494fn map_sqlite_row(
495    row: &rusqlite::Row<'_>,
496    columns: &[String],
497) -> rusqlite::Result<Map<String, Value>> {
498    let mut map = Map::new();
499    for (i, name) in columns.iter().enumerate() {
500        let val = match row.get_ref(i)? {
501            ValueRef::Null => Value::Null,
502            ValueRef::Integer(n) => Value::from(n),
503            ValueRef::Real(f) => {
504                Value::from(serde_json::Number::from_f64(f).unwrap_or(serde_json::Number::from(0)))
505            }
506            ValueRef::Text(b) => {
507                let s = String::from_utf8_lossy(b).into_owned();
508                serde_json::from_str(&s).unwrap_or(Value::String(s))
509            }
510            ValueRef::Blob(b) => {
511                let s = String::from_utf8_lossy(b).into_owned();
512                serde_json::from_str(&s).unwrap_or(Value::String(s))
513            }
514        };
515        if val != Value::Null {
516            map.insert(name.clone(), val);
517        }
518    }
519    Ok(map)
520}