Skip to main content

helios_persistence/sof/
postgres.rs

1//! PostgreSQL in-DB SQL-on-FHIR runner.
2//!
3//! [`PgInDbRunner`] compiles a ViewDefinition to a parameterised PostgreSQL
4//! `SELECT` statement and executes it directly against the `resources` table,
5//! bypassing in-process FHIRPath evaluation entirely.
6//!
7//! ## Streaming
8//!
9//! Rows are fetched lazily via `tokio_postgres::Client::query_raw` and sent
10//! through a bounded `tokio::sync::mpsc` channel (buffer: 256) so the HTTP
11//! layer can begin flushing before the full result set has been transferred.
12//! The async fetch loop runs in a `tokio::spawn` task that holds the pooled
13//! connection open until the consumer drops the receiver.
14
15use deadpool_postgres::Pool;
16use futures::StreamExt as _;
17use helios_fhir::FhirVersion;
18use serde_json::{Map, Value};
19use tokio_stream::wrappers::ReceiverStream;
20use tracing::debug;
21
22use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
23use crate::tenant::TenantContext;
24
25use super::compiler::{SqlDialect, compile_view_definition_dialect};
26
27/// Channel buffer depth (rows that can be queued ahead of the consumer).
28const CHANNEL_BUFFER: usize = 256;
29
30/// SQL-on-FHIR runner that compiles ViewDefinitions to PostgreSQL SQL.
31pub struct PgInDbRunner {
32    pool: Pool,
33    fhir_version: FhirVersion,
34}
35
36impl PgInDbRunner {
37    /// Creates a new runner backed by the given connection pool. Uses the
38    /// default FHIR version (R4) for compile-time cardinality lookups; call
39    /// [`Self::with_fhir_version`] to override.
40    pub fn new(pool: Pool) -> Self {
41        Self {
42            pool,
43            fhir_version: FhirVersion::default_enabled(),
44        }
45    }
46
47    /// Returns a runner that consults the given FHIR version's field-type
48    /// table when validating `collection: false` columns.
49    pub fn with_fhir_version(mut self, version: FhirVersion) -> Self {
50        self.fhir_version = version;
51        self
52    }
53}
54
55#[async_trait::async_trait]
56impl SofRunner for PgInDbRunner {
57    fn runner_name(&self) -> &'static str {
58        "postgres-indb"
59    }
60
61    async fn run_view(
62        &self,
63        tenant: &TenantContext,
64        view_definition: Value,
65        mut filters: ViewFilters,
66    ) -> Result<RowStream, SofError> {
67        // Compile synchronously (cheap, no I/O)
68        let compiled = compile_view_definition_dialect(
69            &view_definition,
70            SqlDialect::Postgres,
71            self.fhir_version,
72        )?;
73
74        debug!(
75            runner = "postgres-indb",
76            tenant = %tenant.tenant_id(),
77            "executing compiled ViewDefinition"
78        );
79
80        let tenant_id = tenant.tenant_id().to_string();
81        let resource_type = view_definition
82            .get("resource")
83            .and_then(|v| v.as_str())
84            .unwrap_or("")
85            .to_string();
86
87        // Spec-correct `group` handling: resolve each Group/{id} to its
88        // `member.entity` Patient references and fold them into the patient
89        // filter. Same pattern as the SQLite runner.
90        if !filters.group.is_empty() {
91            let resolved =
92                resolve_group_refs_to_patient_refs(&self.pool, &tenant_id, &filters.group).await?;
93            for p in resolved {
94                if !filters.patient.iter().any(|existing| existing == &p) {
95                    filters.patient.push(p);
96                }
97            }
98            filters.group.clear();
99        }
100
101        let limit = filters.limit;
102        let columns = compiled.columns.clone();
103        let pool = self.pool.clone();
104
105        // Build SQL with runtime filters and collect typed params. The
106        // compiled query already reserves `$3..$N` for ViewDefinition
107        // constants; runtime filters allocate from the next free slot.
108        let (sql, params) = build_pg_sql_and_params(
109            &compiled.sql,
110            tenant_id,
111            resource_type,
112            &compiled.constants,
113            &filters,
114            self.fhir_version,
115        );
116
117        let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
118
119        tokio::spawn(async move {
120            stream_pg_rows(pool, sql, params, columns, limit, tx).await;
121        });
122
123        Ok(Box::pin(ReceiverStream::new(rx)))
124    }
125}
126
127/// Loads each `Group/{id}` from the `resources` table and extracts its
128/// `member.entity` Patient references via the shared
129/// [`helios_sof::resolve_group_members_to_patient_refs`]. Returns the
130/// union of those Patient refs across all supplied group refs. Unknown
131/// groups are silently skipped (matches the inline path; absent-target
132/// warning is audit item #5).
133async fn resolve_group_refs_to_patient_refs(
134    pool: &Pool,
135    tenant_id: &str,
136    group_refs: &[String],
137) -> Result<Vec<String>, SofError> {
138    if group_refs.is_empty() {
139        return Ok(Vec::new());
140    }
141    let client = pool
142        .get()
143        .await
144        .map_err(|e| SofError::Storage(format!("failed to get pg connection: {e}")))?;
145    let stmt = client
146        .prepare(
147            "SELECT data FROM resources \
148             WHERE tenant_id = $1 \
149               AND resource_type = 'Group' \
150               AND id = $2 \
151               AND is_deleted = false",
152        )
153        .await
154        .map_err(|e| SofError::Storage(format!("prepare failed: {e}")))?;
155
156    let mut groups = Vec::with_capacity(group_refs.len());
157    for r in group_refs {
158        let id = r.strip_prefix("Group/").unwrap_or(r);
159        match client.query_opt(&stmt, &[&tenant_id, &id]).await {
160            Ok(Some(row)) => {
161                let data: Value = row.get(0);
162                groups.push(data);
163            }
164            Ok(None) => continue,
165            Err(e) => {
166                return Err(SofError::Storage(format!(
167                    "group lookup failed for {r}: {e}"
168                )));
169            }
170        }
171    }
172
173    let set = helios_sof::resolve_group_members_to_patient_refs(group_refs, &groups);
174    Ok(set.into_iter().collect())
175}
176
177// ============================================================================
178// SQL runtime-filter injection
179// ============================================================================
180
181/// Builds the final SQL and typed params list for a PG query.
182///
183/// The base SQL uses `$1 = tenant_id` and `$2 = resource_type`.
184/// Extra filter conditions inject `$3`, `$4`, … as needed.
185fn build_pg_sql_and_params(
186    base_sql: &str,
187    tenant_id: String,
188    resource_type: String,
189    constants: &[super::ir::LitValue],
190    filters: &ViewFilters,
191    fhir_version: FhirVersion,
192) -> (String, Vec<PgParam>) {
193    let mut conditions: Vec<String> = Vec::new();
194    let mut extra: Vec<PgParam> = Vec::new();
195    // Constants occupy `$3..$(2+constants.len())`; runtime filters start
196    // immediately after.
197    let mut constant_params: Vec<PgParam> = Vec::with_capacity(constants.len());
198    for c in constants {
199        constant_params.push(PgParam::from_lit(c));
200    }
201    let mut next_param = 3usize + constants.len();
202
203    if let Some(since) = filters.since {
204        conditions.push(format!("r.last_updated >= ${next_param}"));
205        extra.push(PgParam::Timestamp(since));
206        next_param += 1;
207    }
208
209    if let Some(c) = compartment_filter_sql(
210        fhir_version,
211        "Patient",
212        &resource_type,
213        &filters.patient,
214        &mut next_param,
215        &mut extra,
216    ) {
217        conditions.push(c);
218    }
219
220    if let Some(c) = compartment_filter_sql(
221        fhir_version,
222        "Group",
223        &resource_type,
224        &filters.group,
225        &mut next_param,
226        &mut extra,
227    ) {
228        conditions.push(c);
229    }
230
231    let sql = if conditions.is_empty() {
232        base_sql.to_string()
233    } else {
234        let joined = conditions.join(" AND ");
235        inject_before_order_by(base_sql, &format!(" AND {joined}"))
236    };
237
238    let mut all_params = vec![PgParam::Text(tenant_id), PgParam::Text(resource_type)];
239    all_params.extend(constant_params);
240    all_params.extend(extra);
241
242    (sql, all_params)
243}
244
245/// Builds a PostgreSQL `WHERE` fragment that filters `r` to resources in
246/// the named compartment of any of `compartment_refs`. Drives the lookup
247/// off the spec's `CompartmentDefinition` via
248/// [`helios_fhir::compartment_params`] and queries the pre-populated
249/// `search_index` table — no FHIRPath evaluation at query time.
250///
251/// See the matching SQLite implementation for algorithm details; the only
252/// difference here is `$N` parameter syntax instead of `?N`.
253fn compartment_filter_sql(
254    fhir_version: FhirVersion,
255    compartment_type: &str,
256    resource_type: &str,
257    compartment_refs: &[String],
258    next_param: &mut usize,
259    extra_params: &mut Vec<PgParam>,
260) -> Option<String> {
261    if compartment_refs.is_empty() {
262        return None;
263    }
264
265    let canonical_prefix = format!("{}/", compartment_type);
266
267    // Case 1: the view's resource is the compartment owner itself.
268    if resource_type == compartment_type {
269        let mut ors: Vec<String> = Vec::with_capacity(compartment_refs.len());
270        for r in compartment_refs {
271            let id = r.strip_prefix(canonical_prefix.as_str()).unwrap_or(r);
272            let p = *next_param;
273            ors.push(format!("r.id = ${p}"));
274            extra_params.push(PgParam::Text(id.to_string()));
275            *next_param += 1;
276        }
277        return Some(format!("({})", ors.join(" OR ")));
278    }
279
280    // Case 2: look up the search-param names that link `resource_type`
281    // to the compartment.
282    let names = helios_fhir::compartment_params(fhir_version, compartment_type, resource_type);
283    if names.is_empty() {
284        return Some("1=0".to_string());
285    }
286
287    let mut name_placeholders = Vec::with_capacity(names.len());
288    for n in names {
289        let p = *next_param;
290        name_placeholders.push(format!("${p}"));
291        extra_params.push(PgParam::Text((*n).to_string()));
292        *next_param += 1;
293    }
294
295    let mut ref_placeholders = Vec::with_capacity(compartment_refs.len());
296    for r in compartment_refs {
297        let canonical = if r.starts_with(canonical_prefix.as_str()) {
298            r.clone()
299        } else {
300            format!("{}{}", canonical_prefix, r)
301        };
302        let p = *next_param;
303        ref_placeholders.push(format!("${p}"));
304        extra_params.push(PgParam::Text(canonical));
305        *next_param += 1;
306    }
307
308    // `$1` and `$2` are tenant_id and resource_type (bound by the outer
309    // query); we reuse them inside the EXISTS subquery so the search_index
310    // join stays tenant-isolated and resource-typed.
311    Some(format!(
312        "EXISTS (SELECT 1 FROM search_index si \
313         WHERE si.tenant_id = $1 \
314           AND si.resource_type = $2 \
315           AND si.resource_id = r.id \
316           AND si.param_name IN ({}) \
317           AND si.value_reference IN ({}))",
318        name_placeholders.join(","),
319        ref_placeholders.join(",")
320    ))
321}
322
323/// Inserts `extra` before the trailing `ORDER BY` in `sql`, or appends it.
324///
325/// The compiler emits `\nORDER BY …` (newline-prefixed), so we search for
326/// that pattern first; the space-prefixed variant is a fallback for hand-crafted SQL.
327fn inject_before_order_by(sql: &str, extra: &str) -> String {
328    let search = ["\nORDER BY", " ORDER BY"];
329    for pat in search {
330        if let Some(pos) = sql.rfind(pat) {
331            let mut s = sql.to_string();
332            s.insert_str(pos, extra);
333            return s;
334        }
335    }
336    format!("{sql}{extra}")
337}
338
339// ============================================================================
340// Typed parameter enum — avoids the self-referential borrow issues with
341// `Vec<Box<dyn ToSql>>` + `Vec<&dyn ToSql>` that arise in async tasks.
342// ============================================================================
343
344#[derive(Clone)]
345enum PgParam {
346    Text(String),
347    Bool(bool),
348    Int(i64),
349    Decimal(String),
350    Null,
351    Timestamp(chrono::DateTime<chrono::Utc>),
352}
353
354impl PgParam {
355    /// Lifts a [`super::ir::LitValue`] (used by `ViewDefinition.constant[]`)
356    /// into the runtime parameter representation. Decimals bind as text and
357    /// rely on PG's implicit cast to `numeric` at the call site.
358    fn from_lit(v: &super::ir::LitValue) -> Self {
359        match v {
360            super::ir::LitValue::Null => PgParam::Null,
361            super::ir::LitValue::Bool(b) => PgParam::Bool(*b),
362            super::ir::LitValue::Int(n) => PgParam::Int(*n),
363            super::ir::LitValue::Decimal(s) => PgParam::Decimal(s.clone()),
364            super::ir::LitValue::Str(s) => PgParam::Text(s.clone()),
365        }
366    }
367}
368
369// ============================================================================
370// Async fetch loop
371// ============================================================================
372
373async fn stream_pg_rows(
374    pool: Pool,
375    sql: String,
376    params: Vec<PgParam>,
377    columns: Vec<String>,
378    limit: Option<usize>,
379    tx: tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
380) {
381    if let Err(e) = stream_pg_rows_inner(pool, sql, params, columns, limit, &tx).await {
382        let _ = tx.send(Err(e)).await;
383    }
384}
385
386async fn stream_pg_rows_inner(
387    pool: Pool,
388    sql: String,
389    params: Vec<PgParam>,
390    columns: Vec<String>,
391    limit: Option<usize>,
392    tx: &tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
393) -> Result<(), SofError> {
394    let client = pool
395        .get()
396        .await
397        .map_err(|e| SofError::Storage(format!("failed to acquire Postgres connection: {e}")))?;
398
399    if std::env::var("PG_SOF_DEBUG_ALL").is_ok() {
400        eprintln!("[PG_SOF_DEBUG_ALL] preparing\n--- SQL ---\n{sql}\n---");
401    }
402    let stmt = client.prepare(&sql).await.map_err(|e| {
403        if std::env::var("PG_SOF_DEBUG").is_ok() {
404            eprintln!("[PG_SOF_DEBUG] prepare failed: {e}\n--- SQL ---\n{sql}\n---");
405        }
406        SofError::Backend(format!("failed to prepare SQL: {e}"))
407    })?;
408
409    // Build boxed params for query_raw; these are 'static + Send
410    let boxed: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = params
411        .into_iter()
412        .map(|p| -> Box<dyn tokio_postgres::types::ToSql + Sync + Send> {
413            match p {
414                PgParam::Text(s) => Box::new(s),
415                // Bind Bool/Int/Decimal constants as text so they compare
416                // cleanly against `->>`/`#>>` JSON-text projections without
417                // a per-call PG type-mismatch. Numeric contexts apply
418                // explicit `::numeric` casts via `lower_binop_dialect`;
419                // boolean contexts compare against `'true'`/`'false'`.
420                PgParam::Bool(b) => Box::new(if b {
421                    "true".to_string()
422                } else {
423                    "false".to_string()
424                }),
425                PgParam::Int(n) => Box::new(n.to_string()),
426                PgParam::Decimal(s) => Box::new(s),
427                PgParam::Null => Box::new(None::<String>),
428                PgParam::Timestamp(dt) => Box::new(dt),
429            }
430        })
431        .collect();
432
433    // query_raw needs a slice of &dyn ToSql + Sync. Build references that borrow
434    // from `boxed` — both live in this async block's stack frame, so no lifetime
435    // issue (the future holds them until the stream is exhausted).
436    let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = boxed
437        .iter()
438        .map(|b| b.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
439        .collect();
440
441    let raw = client
442        .query_raw(&stmt, param_refs.iter().copied())
443        .await
444        .map_err(|e| {
445            if std::env::var("PG_SOF_DEBUG").is_ok() {
446                eprintln!("[PG_SOF_DEBUG] query failed: {e}\n--- SQL ---\n{sql}\n---");
447            }
448            SofError::Backend(format!("query execution failed: {e}"))
449        })?;
450
451    // params no longer needed after query_raw returns (data sent to DB)
452    drop(param_refs);
453    drop(boxed);
454
455    futures::pin_mut!(raw);
456
457    let mut count = 0usize;
458    while let Some(row_result) = raw.next().await {
459        match row_result {
460            Ok(pg_row) => {
461                if let Some(cap) = limit {
462                    if count >= cap {
463                        break;
464                    }
465                }
466                count += 1;
467                match row_to_json(&pg_row, &columns) {
468                    Ok(row) => {
469                        if tx.send(Ok(row)).await.is_err() {
470                            break; // receiver dropped
471                        }
472                    }
473                    Err(e) => {
474                        let _ = tx.send(Err(e)).await;
475                        break;
476                    }
477                }
478            }
479            Err(e) => {
480                if std::env::var("PG_SOF_DEBUG").is_ok() {
481                    eprintln!("[PG_SOF_DEBUG] row error: {e}\n--- SQL ---\n{sql}\n---");
482                }
483                let _ = tx
484                    .send(Err(SofError::Backend(format!("row error: {e}"))))
485                    .await;
486                break;
487            }
488        }
489    }
490
491    debug!(
492        runner = "postgres-indb",
493        rows = count,
494        "in-DB view run complete"
495    );
496    Ok(())
497    // tx dropped here, closing the ReceiverStream
498}
499
500// ============================================================================
501// Row → JSON conversion
502// ============================================================================
503
504/// Converts a `tokio_postgres::Row` into a `serde_json::Value` object.
505///
506/// The compiled SQL projects all columns as text via `->>`/`#>>` operators.
507fn row_to_json(pg_row: &tokio_postgres::Row, columns: &[String]) -> Result<ViewRow, SofError> {
508    let mut map = Map::new();
509    for (i, name) in columns.iter().enumerate() {
510        let val: Option<String> = pg_row
511            .try_get(i)
512            .map_err(|e| SofError::Backend(format!("failed to read column '{name}': {e}")))?;
513
514        if let Some(s) = val {
515            let json_val = serde_json::from_str(&s).unwrap_or(Value::String(s));
516            map.insert(name.clone(), json_val);
517        }
518    }
519    Ok(Value::Object(map))
520}