use deadpool_postgres::Pool;
use futures::StreamExt as _;
use helios_fhir::FhirVersion;
use serde_json::{Map, Value};
use tokio_stream::wrappers::ReceiverStream;
use tracing::debug;
use crate::core::sof_runner::{RowStream, SofError, SofRunner, ViewFilters, ViewRow};
use crate::tenant::TenantContext;
use super::compiler::{SqlDialect, compile_view_definition_dialect};
const CHANNEL_BUFFER: usize = 256;
pub struct PgInDbRunner {
pool: Pool,
fhir_version: FhirVersion,
}
impl PgInDbRunner {
pub fn new(pool: Pool) -> Self {
Self {
pool,
fhir_version: FhirVersion::default_enabled(),
}
}
pub fn with_fhir_version(mut self, version: FhirVersion) -> Self {
self.fhir_version = version;
self
}
}
#[async_trait::async_trait]
impl SofRunner for PgInDbRunner {
fn runner_name(&self) -> &'static str {
"postgres-indb"
}
async fn run_view(
&self,
tenant: &TenantContext,
view_definition: Value,
mut filters: ViewFilters,
) -> Result<RowStream, SofError> {
let compiled = compile_view_definition_dialect(
&view_definition,
SqlDialect::Postgres,
self.fhir_version,
)?;
debug!(
runner = "postgres-indb",
tenant = %tenant.tenant_id(),
"executing compiled ViewDefinition"
);
let tenant_id = tenant.tenant_id().to_string();
let resource_type = view_definition
.get("resource")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if !filters.group.is_empty() {
let resolved =
resolve_group_refs_to_patient_refs(&self.pool, &tenant_id, &filters.group).await?;
for p in resolved {
if !filters.patient.iter().any(|existing| existing == &p) {
filters.patient.push(p);
}
}
filters.group.clear();
}
let limit = filters.limit;
let columns = compiled.columns.clone();
let pool = self.pool.clone();
let (sql, params) = build_pg_sql_and_params(
&compiled.sql,
tenant_id,
resource_type,
&compiled.constants,
&filters,
self.fhir_version,
);
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ViewRow, SofError>>(CHANNEL_BUFFER);
tokio::spawn(async move {
stream_pg_rows(pool, sql, params, columns, limit, tx).await;
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
}
async fn resolve_group_refs_to_patient_refs(
pool: &Pool,
tenant_id: &str,
group_refs: &[String],
) -> Result<Vec<String>, SofError> {
if group_refs.is_empty() {
return Ok(Vec::new());
}
let client = pool
.get()
.await
.map_err(|e| SofError::Storage(format!("failed to get pg connection: {e}")))?;
let stmt = client
.prepare(
"SELECT data FROM resources \
WHERE tenant_id = $1 \
AND resource_type = 'Group' \
AND id = $2 \
AND is_deleted = false",
)
.await
.map_err(|e| SofError::Storage(format!("prepare failed: {e}")))?;
let mut groups = Vec::with_capacity(group_refs.len());
for r in group_refs {
let id = r.strip_prefix("Group/").unwrap_or(r);
match client.query_opt(&stmt, &[&tenant_id, &id]).await {
Ok(Some(row)) => {
let data: Value = row.get(0);
groups.push(data);
}
Ok(None) => continue,
Err(e) => {
return Err(SofError::Storage(format!(
"group lookup failed for {r}: {e}"
)));
}
}
}
let set = helios_sof::resolve_group_members_to_patient_refs(group_refs, &groups);
Ok(set.into_iter().collect())
}
fn build_pg_sql_and_params(
base_sql: &str,
tenant_id: String,
resource_type: String,
constants: &[super::ir::LitValue],
filters: &ViewFilters,
fhir_version: FhirVersion,
) -> (String, Vec<PgParam>) {
let mut conditions: Vec<String> = Vec::new();
let mut extra: Vec<PgParam> = Vec::new();
let mut constant_params: Vec<PgParam> = Vec::with_capacity(constants.len());
for c in constants {
constant_params.push(PgParam::from_lit(c));
}
let mut next_param = 3usize + constants.len();
if let Some(since) = filters.since {
conditions.push(format!("r.last_updated >= ${next_param}"));
extra.push(PgParam::Timestamp(since));
next_param += 1;
}
if let Some(c) = compartment_filter_sql(
fhir_version,
"Patient",
&resource_type,
&filters.patient,
&mut next_param,
&mut extra,
) {
conditions.push(c);
}
if let Some(c) = compartment_filter_sql(
fhir_version,
"Group",
&resource_type,
&filters.group,
&mut next_param,
&mut extra,
) {
conditions.push(c);
}
let sql = if conditions.is_empty() {
base_sql.to_string()
} else {
let joined = conditions.join(" AND ");
inject_before_order_by(base_sql, &format!(" AND {joined}"))
};
let mut all_params = vec![PgParam::Text(tenant_id), PgParam::Text(resource_type)];
all_params.extend(constant_params);
all_params.extend(extra);
(sql, all_params)
}
fn compartment_filter_sql(
fhir_version: FhirVersion,
compartment_type: &str,
resource_type: &str,
compartment_refs: &[String],
next_param: &mut usize,
extra_params: &mut Vec<PgParam>,
) -> Option<String> {
if compartment_refs.is_empty() {
return None;
}
let canonical_prefix = format!("{}/", compartment_type);
if resource_type == compartment_type {
let mut ors: Vec<String> = Vec::with_capacity(compartment_refs.len());
for r in compartment_refs {
let id = r.strip_prefix(canonical_prefix.as_str()).unwrap_or(r);
let p = *next_param;
ors.push(format!("r.id = ${p}"));
extra_params.push(PgParam::Text(id.to_string()));
*next_param += 1;
}
return Some(format!("({})", ors.join(" OR ")));
}
let names = helios_fhir::compartment_params(fhir_version, compartment_type, resource_type);
if names.is_empty() {
return Some("1=0".to_string());
}
let mut name_placeholders = Vec::with_capacity(names.len());
for n in names {
let p = *next_param;
name_placeholders.push(format!("${p}"));
extra_params.push(PgParam::Text((*n).to_string()));
*next_param += 1;
}
let mut ref_placeholders = Vec::with_capacity(compartment_refs.len());
for r in compartment_refs {
let canonical = if r.starts_with(canonical_prefix.as_str()) {
r.clone()
} else {
format!("{}{}", canonical_prefix, r)
};
let p = *next_param;
ref_placeholders.push(format!("${p}"));
extra_params.push(PgParam::Text(canonical));
*next_param += 1;
}
Some(format!(
"EXISTS (SELECT 1 FROM search_index si \
WHERE si.tenant_id = $1 \
AND si.resource_type = $2 \
AND si.resource_id = r.id \
AND si.param_name IN ({}) \
AND si.value_reference IN ({}))",
name_placeholders.join(","),
ref_placeholders.join(",")
))
}
fn inject_before_order_by(sql: &str, extra: &str) -> String {
let search = ["\nORDER BY", " ORDER BY"];
for pat in search {
if let Some(pos) = sql.rfind(pat) {
let mut s = sql.to_string();
s.insert_str(pos, extra);
return s;
}
}
format!("{sql}{extra}")
}
#[derive(Clone)]
enum PgParam {
Text(String),
Bool(bool),
Int(i64),
Decimal(String),
Null,
Timestamp(chrono::DateTime<chrono::Utc>),
}
impl PgParam {
fn from_lit(v: &super::ir::LitValue) -> Self {
match v {
super::ir::LitValue::Null => PgParam::Null,
super::ir::LitValue::Bool(b) => PgParam::Bool(*b),
super::ir::LitValue::Int(n) => PgParam::Int(*n),
super::ir::LitValue::Decimal(s) => PgParam::Decimal(s.clone()),
super::ir::LitValue::Str(s) => PgParam::Text(s.clone()),
}
}
}
async fn stream_pg_rows(
pool: Pool,
sql: String,
params: Vec<PgParam>,
columns: Vec<String>,
limit: Option<usize>,
tx: tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
) {
if let Err(e) = stream_pg_rows_inner(pool, sql, params, columns, limit, &tx).await {
let _ = tx.send(Err(e)).await;
}
}
async fn stream_pg_rows_inner(
pool: Pool,
sql: String,
params: Vec<PgParam>,
columns: Vec<String>,
limit: Option<usize>,
tx: &tokio::sync::mpsc::Sender<Result<ViewRow, SofError>>,
) -> Result<(), SofError> {
let client = pool
.get()
.await
.map_err(|e| SofError::Storage(format!("failed to acquire Postgres connection: {e}")))?;
if std::env::var("PG_SOF_DEBUG_ALL").is_ok() {
eprintln!("[PG_SOF_DEBUG_ALL] preparing\n--- SQL ---\n{sql}\n---");
}
let stmt = client.prepare(&sql).await.map_err(|e| {
if std::env::var("PG_SOF_DEBUG").is_ok() {
eprintln!("[PG_SOF_DEBUG] prepare failed: {e}\n--- SQL ---\n{sql}\n---");
}
SofError::Backend(format!("failed to prepare SQL: {e}"))
})?;
let boxed: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = params
.into_iter()
.map(|p| -> Box<dyn tokio_postgres::types::ToSql + Sync + Send> {
match p {
PgParam::Text(s) => Box::new(s),
PgParam::Bool(b) => Box::new(if b {
"true".to_string()
} else {
"false".to_string()
}),
PgParam::Int(n) => Box::new(n.to_string()),
PgParam::Decimal(s) => Box::new(s),
PgParam::Null => Box::new(None::<String>),
PgParam::Timestamp(dt) => Box::new(dt),
}
})
.collect();
let param_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = boxed
.iter()
.map(|b| b.as_ref() as &(dyn tokio_postgres::types::ToSql + Sync))
.collect();
let raw = client
.query_raw(&stmt, param_refs.iter().copied())
.await
.map_err(|e| {
if std::env::var("PG_SOF_DEBUG").is_ok() {
eprintln!("[PG_SOF_DEBUG] query failed: {e}\n--- SQL ---\n{sql}\n---");
}
SofError::Backend(format!("query execution failed: {e}"))
})?;
drop(param_refs);
drop(boxed);
futures::pin_mut!(raw);
let mut count = 0usize;
while let Some(row_result) = raw.next().await {
match row_result {
Ok(pg_row) => {
if let Some(cap) = limit {
if count >= cap {
break;
}
}
count += 1;
match row_to_json(&pg_row, &columns) {
Ok(row) => {
if tx.send(Ok(row)).await.is_err() {
break; }
}
Err(e) => {
let _ = tx.send(Err(e)).await;
break;
}
}
}
Err(e) => {
if std::env::var("PG_SOF_DEBUG").is_ok() {
eprintln!("[PG_SOF_DEBUG] row error: {e}\n--- SQL ---\n{sql}\n---");
}
let _ = tx
.send(Err(SofError::Backend(format!("row error: {e}"))))
.await;
break;
}
}
}
debug!(
runner = "postgres-indb",
rows = count,
"in-DB view run complete"
);
Ok(())
}
fn row_to_json(pg_row: &tokio_postgres::Row, columns: &[String]) -> Result<ViewRow, SofError> {
let mut map = Map::new();
for (i, name) in columns.iter().enumerate() {
let val: Option<String> = pg_row
.try_get(i)
.map_err(|e| SofError::Backend(format!("failed to read column '{name}': {e}")))?;
if let Some(s) = val {
let json_val = serde_json::from_str(&s).unwrap_or(Value::String(s));
map.insert(name.clone(), json_val);
}
}
Ok(Value::Object(map))
}